airbyte.cloud.connections

Cloud Connections.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Cloud Connections."""
  3
  4from __future__ import annotations
  5
  6import logging
  7from typing import TYPE_CHECKING, Any
  8
  9from typing_extensions import deprecated
 10
 11from airbyte._util import api_util
 12from airbyte.cloud._connection_state import (
 13    ConnectionStateResponse,
 14    _get_stream_list,
 15    _match_stream,
 16)
 17from airbyte.cloud.connectors import CloudDestination, CloudSource
 18from airbyte.cloud.sync_results import SyncResult
 19from airbyte.exceptions import AirbyteWorkspaceMismatchError, PyAirbyteInputError
 20
 21
 22logger = logging.getLogger(__name__)
 23
 24
 25if TYPE_CHECKING:
 26    from airbyte_api.models import ConnectionResponse, JobResponse, JobTypeEnum
 27
 28    from airbyte.cloud.workspaces import CloudWorkspace
 29
 30
 31class CloudConnection:  # noqa: PLR0904  # Too many public methods
 32    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 33
 34    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 35    """
 36
 37    def __init__(
 38        self,
 39        workspace: CloudWorkspace,
 40        connection_id: str,
 41        source: str | None = None,
 42        destination: str | None = None,
 43    ) -> None:
 44        """It is not recommended to create a `CloudConnection` object directly.
 45
 46        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 47        """
 48        self.connection_id = connection_id
 49        """The ID of the connection."""
 50
 51        self.workspace = workspace
 52        """The workspace that the connection belongs to."""
 53
 54        self._source_id = source
 55        """The ID of the source."""
 56
 57        self._destination_id = destination
 58        """The ID of the destination."""
 59
 60        self._connection_info: ConnectionResponse | None = None
 61        """The connection info object. (Cached.)"""
 62
 63        self._cloud_source_object: CloudSource | None = None
 64        """The source object. (Cached.)"""
 65
 66        self._cloud_destination_object: CloudDestination | None = None
 67        """The destination object. (Cached.)"""
 68
 69    def _fetch_connection_info(
 70        self,
 71        *,
 72        force_refresh: bool = False,
 73        verify: bool = True,
 74    ) -> ConnectionResponse:
 75        """Fetch and cache connection info from the API.
 76
 77        By default, this method will only fetch from the API if connection info is not
 78        already cached. It also verifies that the connection belongs to the expected
 79        workspace unless verification is explicitly disabled.
 80
 81        Args:
 82            force_refresh: If True, always fetch from the API even if cached.
 83                If False (default), only fetch if not already cached.
 84            verify: If True (default), verify that the connection is valid (e.g., that
 85                the workspace_id matches this object's workspace). Raises an error if
 86                validation fails.
 87
 88        Returns:
 89            The ConnectionResponse from the API.
 90
 91        Raises:
 92            AirbyteWorkspaceMismatchError: If verify is True and the connection's
 93                workspace_id doesn't match the expected workspace.
 94            AirbyteMissingResourceError: If the connection doesn't exist.
 95        """
 96        if not force_refresh and self._connection_info is not None:
 97            # Use cached info, but still verify if requested
 98            if verify:
 99                self._verify_workspace_match(self._connection_info)
100            return self._connection_info
101
102        # Fetch from API
103        connection_info = api_util.get_connection(
104            workspace_id=self.workspace.workspace_id,
105            connection_id=self.connection_id,
106            api_root=self.workspace.api_root,
107            client_id=self.workspace.client_id,
108            client_secret=self.workspace.client_secret,
109            bearer_token=self.workspace.bearer_token,
110        )
111
112        # Cache the result first (before verification may raise)
113        self._connection_info = connection_info
114
115        # Verify if requested
116        if verify:
117            self._verify_workspace_match(connection_info)
118
119        return connection_info
120
121    def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None:
122        """Verify that the connection belongs to the expected workspace.
123
124        Raises:
125            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
126        """
127        if connection_info.workspace_id != self.workspace.workspace_id:
128            raise AirbyteWorkspaceMismatchError(
129                resource_type="connection",
130                resource_id=self.connection_id,
131                workspace=self.workspace,
132                expected_workspace_id=self.workspace.workspace_id,
133                actual_workspace_id=connection_info.workspace_id,
134                message=(
135                    f"Connection '{self.connection_id}' belongs to workspace "
136                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
137                ),
138            )
139
140    def check_is_valid(self) -> bool:
141        """Check if this connection exists and belongs to the expected workspace.
142
143        This method fetches connection info from the API (if not already cached) and
144        verifies that the connection's workspace_id matches the workspace associated
145        with this CloudConnection object.
146
147        Returns:
148            True if the connection exists and belongs to the expected workspace.
149
150        Raises:
151            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
152            AirbyteMissingResourceError: If the connection doesn't exist.
153        """
154        self._fetch_connection_info(force_refresh=False, verify=True)
155        return True
156
157    @classmethod
158    def _from_connection_response(
159        cls,
160        workspace: CloudWorkspace,
161        connection_response: ConnectionResponse,
162    ) -> CloudConnection:
163        """Create a CloudConnection from a ConnectionResponse."""
164        result = cls(
165            workspace=workspace,
166            connection_id=connection_response.connection_id,
167            source=connection_response.source_id,
168            destination=connection_response.destination_id,
169        )
170        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
171        return result
172
173    # Properties
174
175    @property
176    def name(self) -> str | None:
177        """Get the display name of the connection, if available.
178
179        E.g. "My Postgres to Snowflake", not the connection ID.
180        """
181        if not self._connection_info:
182            self._connection_info = self._fetch_connection_info()
183
184        return self._connection_info.name
185
186    @property
187    def source_id(self) -> str:
188        """The ID of the source."""
189        if not self._source_id:
190            if not self._connection_info:
191                self._connection_info = self._fetch_connection_info()
192
193            self._source_id = self._connection_info.source_id
194
195        return self._source_id
196
197    @property
198    def source(self) -> CloudSource:
199        """Get the source object."""
200        if self._cloud_source_object:
201            return self._cloud_source_object
202
203        self._cloud_source_object = CloudSource(
204            workspace=self.workspace,
205            connector_id=self.source_id,
206        )
207        return self._cloud_source_object
208
209    @property
210    def destination_id(self) -> str:
211        """The ID of the destination."""
212        if not self._destination_id:
213            if not self._connection_info:
214                self._connection_info = self._fetch_connection_info()
215
216            self._destination_id = self._connection_info.destination_id
217
218        return self._destination_id
219
220    @property
221    def destination(self) -> CloudDestination:
222        """Get the destination object."""
223        if self._cloud_destination_object:
224            return self._cloud_destination_object
225
226        self._cloud_destination_object = CloudDestination(
227            workspace=self.workspace,
228            connector_id=self.destination_id,
229        )
230        return self._cloud_destination_object
231
232    @property
233    def stream_names(self) -> list[str]:
234        """The stream names."""
235        if not self._connection_info:
236            self._connection_info = self._fetch_connection_info()
237
238        return [stream.name for stream in self._connection_info.configurations.streams or []]
239
240    @property
241    def table_prefix(self) -> str:
242        """The table prefix."""
243        if not self._connection_info:
244            self._connection_info = self._fetch_connection_info()
245
246        return self._connection_info.prefix or ""
247
248    @property
249    def connection_url(self) -> str | None:
250        """The web URL to the connection."""
251        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
252
253    @property
254    def job_history_url(self) -> str | None:
255        """The URL to the job history for the connection."""
256        return f"{self.connection_url}/timeline"
257
258    # Run Sync
259
260    def run_sync(
261        self,
262        *,
263        wait: bool = True,
264        wait_timeout: int = 300,
265    ) -> SyncResult:
266        """Run a sync."""
267        connection_response = api_util.run_connection(
268            connection_id=self.connection_id,
269            api_root=self.workspace.api_root,
270            workspace_id=self.workspace.workspace_id,
271            client_id=self.workspace.client_id,
272            client_secret=self.workspace.client_secret,
273            bearer_token=self.workspace.bearer_token,
274        )
275        sync_result = SyncResult(
276            workspace=self.workspace,
277            connection=self,
278            job_id=connection_response.job_id,
279        )
280
281        if wait:
282            sync_result.wait_for_completion(
283                wait_timeout=wait_timeout,
284                raise_failure=True,
285                raise_timeout=True,
286            )
287
288        return sync_result
289
290    def __repr__(self) -> str:
291        """String representation of the connection."""
292        return (
293            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
294            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
295        )
296
297    # Logs
298
299    def get_previous_sync_logs(
300        self,
301        *,
302        limit: int = 20,
303        offset: int | None = None,
304        from_tail: bool = True,
305        job_type: JobTypeEnum | None = None,
306    ) -> list[SyncResult]:
307        """Get previous sync jobs for a connection with pagination support.
308
309        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
310        rows_synced, start_time). Full log text can be fetched lazily via
311        `SyncResult.get_full_log_text()`.
312
313        Args:
314            limit: Maximum number of jobs to return. Defaults to 20.
315            offset: Number of jobs to skip from the beginning. Defaults to None (0).
316            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
317                If False, returns jobs ordered oldest-first (createdAt ASC).
318                Defaults to True.
319            job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH).
320                If not specified, defaults to sync and reset jobs only (API default behavior).
321
322        Returns:
323            A list of SyncResult objects representing the sync jobs.
324        """
325        order_by = (
326            api_util.JOB_ORDER_BY_CREATED_AT_DESC
327            if from_tail
328            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
329        )
330        sync_logs: list[JobResponse] = api_util.get_job_logs(
331            connection_id=self.connection_id,
332            api_root=self.workspace.api_root,
333            workspace_id=self.workspace.workspace_id,
334            limit=limit,
335            offset=offset,
336            order_by=order_by,
337            job_type=job_type,
338            client_id=self.workspace.client_id,
339            client_secret=self.workspace.client_secret,
340            bearer_token=self.workspace.bearer_token,
341        )
342        return [
343            SyncResult(
344                workspace=self.workspace,
345                connection=self,
346                job_id=sync_log.job_id,
347                _latest_job_info=sync_log,
348            )
349            for sync_log in sync_logs
350        ]
351
352    def get_sync_result(
353        self,
354        job_id: int | None = None,
355    ) -> SyncResult | None:
356        """Get the sync result for the connection.
357
358        If `job_id` is not provided, the most recent sync job will be used.
359
360        Returns `None` if job_id is omitted and no previous jobs are found.
361        """
362        if job_id is None:
363            # Get the most recent sync job
364            results = self.get_previous_sync_logs(
365                limit=1,
366            )
367            if results:
368                return results[0]
369
370            return None
371
372        # Get the sync job by ID (lazy loaded)
373        return SyncResult(
374            workspace=self.workspace,
375            connection=self,
376            job_id=job_id,
377        )
378
379    # Artifacts
380
381    @deprecated("Use 'dump_raw_state()' instead.")
382    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
383        """Deprecated. Use `dump_raw_state()` instead."""
384        state_response = api_util.get_connection_state(
385            connection_id=self.connection_id,
386            api_root=self.workspace.api_root,
387            client_id=self.workspace.client_id,
388            client_secret=self.workspace.client_secret,
389            bearer_token=self.workspace.bearer_token,
390        )
391        if state_response.get("stateType") == "not_set":
392            return None
393        return state_response.get("streamState", [])
394
395    def dump_raw_state(self) -> dict[str, Any]:
396        """Dump the full raw state for this connection.
397
398        Returns the connection's sync state as a raw dictionary from the API.
399        The result includes stateType, connectionId, and all state data.
400
401        The output of this method can be passed directly to `import_raw_state()`
402        on the same or a different connection (connectionId is overridden on import).
403
404        Returns:
405            The full connection state as a dictionary.
406        """
407        return api_util.get_connection_state(
408            connection_id=self.connection_id,
409            api_root=self.workspace.api_root,
410            client_id=self.workspace.client_id,
411            client_secret=self.workspace.client_secret,
412            bearer_token=self.workspace.bearer_token,
413        )
414
415    def import_raw_state(
416        self,
417        connection_state_dict: dict[str, Any],
418    ) -> dict[str, Any]:
419        """Import (restore) the full raw state for this connection.
420
421        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
422        > could result in broken connections, and/or incorrect sync behavior.
423
424        Replaces the entire connection state with the provided state blob.
425        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
426
427        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
428        The `connectionId` in the blob is always overridden with this connection's
429        ID, making state blobs portable across connections.
430
431        Args:
432            connection_state_dict: The full connection state dict to import. Must include:
433                - stateType: "global", "stream", or "legacy"
434                - One of: state (legacy), streamState (stream), globalState (global)
435
436        Returns:
437            The updated connection state as a dictionary.
438
439        Raises:
440            AirbyteConnectionSyncActiveError: If a sync is currently running on this
441                connection (HTTP 423). Wait for the sync to complete before retrying.
442        """
443        return api_util.replace_connection_state(
444            connection_id=self.connection_id,
445            connection_state_dict=connection_state_dict,
446            api_root=self.workspace.api_root,
447            client_id=self.workspace.client_id,
448            client_secret=self.workspace.client_secret,
449            bearer_token=self.workspace.bearer_token,
450        )
451
452    def get_stream_state(
453        self,
454        stream_name: str,
455        stream_namespace: str | None = None,
456    ) -> dict[str, Any] | None:
457        """Get the state blob for a single stream within this connection.
458
459        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
460        not the full connection state envelope.
461
462        This is compatible with `stream`-type state and stream-level entries
463        within a `global`-type state. It is not compatible with `legacy` state.
464        To get or set the entire connection-level state artifact, use
465        `dump_raw_state` and `import_raw_state` instead.
466
467        Args:
468            stream_name: The name of the stream to get state for.
469            stream_namespace: The source-side stream namespace. This refers to the
470                namespace from the source (e.g., database schema), not any destination
471                namespace override set in connection advanced settings.
472
473        Returns:
474            The stream's state blob as a dictionary, or None if the stream is not found.
475        """
476        state_data = self.dump_raw_state()
477        result = ConnectionStateResponse(**state_data)
478
479        streams = _get_stream_list(result)
480        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
481
482        if not matching:
483            available = [s.stream_descriptor.name for s in streams]
484            logger.warning(
485                "Stream '%s' not found in connection state for connection '%s'. "
486                "Available streams: %s",
487                stream_name,
488                self.connection_id,
489                available,
490            )
491            return None
492
493        return matching[0].stream_state
494
495    def set_stream_state(
496        self,
497        stream_name: str,
498        state_blob_dict: dict[str, Any],
499        stream_namespace: str | None = None,
500    ) -> None:
501        """Set the state for a single stream within this connection.
502
503        Fetches the current full state, replaces only the specified stream's state,
504        then sends the full updated state back to the API. If the stream does not
505        exist in the current state, it is appended.
506
507        This is compatible with `stream`-type state and stream-level entries
508        within a `global`-type state. It is not compatible with `legacy` state.
509        To get or set the entire connection-level state artifact, use
510        `dump_raw_state` and `import_raw_state` instead.
511
512        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
513
514        Args:
515            stream_name: The name of the stream to update state for.
516            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
517            stream_namespace: The source-side stream namespace. This refers to the
518                namespace from the source (e.g., database schema), not any destination
519                namespace override set in connection advanced settings.
520
521        Raises:
522            PyAirbyteInputError: If the connection state type is not supported for
523                stream-level operations (not_set, legacy).
524            AirbyteConnectionSyncActiveError: If a sync is currently running on this
525                connection (HTTP 423). Wait for the sync to complete before retrying.
526        """
527        state_data = self.dump_raw_state()
528        current = ConnectionStateResponse(**state_data)
529
530        if current.state_type == "not_set":
531            raise PyAirbyteInputError(
532                message="Cannot set stream state: connection has no existing state.",
533                context={"connection_id": self.connection_id},
534            )
535
536        if current.state_type == "legacy":
537            raise PyAirbyteInputError(
538                message="Cannot set stream state on a legacy-type connection state.",
539                context={"connection_id": self.connection_id},
540            )
541
542        new_stream_entry = {
543            "streamDescriptor": {
544                "name": stream_name,
545                **(
546                    {
547                        "namespace": stream_namespace,
548                    }
549                    if stream_namespace
550                    else {}
551                ),
552            },
553            "streamState": state_blob_dict,
554        }
555
556        raw_streams: list[dict[str, Any]]
557        if current.state_type == "stream":
558            raw_streams = state_data.get("streamState", [])
559        elif current.state_type == "global":
560            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
561        else:
562            raw_streams = []
563
564        streams = _get_stream_list(current)
565        found = False
566        updated_streams_raw: list[dict[str, Any]] = []
567        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
568            if _match_stream(parsed_s, stream_name, stream_namespace):
569                updated_streams_raw.append(new_stream_entry)
570                found = True
571            else:
572                updated_streams_raw.append(raw_s)
573
574        if not found:
575            updated_streams_raw.append(new_stream_entry)
576
577        full_state: dict[str, Any] = {
578            **state_data,
579        }
580
581        if current.state_type == "stream":
582            full_state["streamState"] = updated_streams_raw
583        elif current.state_type == "global":
584            original_global = state_data.get("globalState", {})
585            full_state["globalState"] = {
586                **original_global,
587                "streamStates": updated_streams_raw,
588            }
589
590        self.import_raw_state(full_state)
591
592    @deprecated("Use 'dump_raw_catalog()' instead.")
593    def get_catalog_artifact(self) -> dict[str, Any] | None:
594        """Get the configured catalog for this connection.
595
596        Returns the full configured catalog (syncCatalog) for this connection,
597        including stream schemas, sync modes, cursor fields, and primary keys.
598
599        Uses the Config API endpoint: POST /v1/web_backend/connections/get
600
601        Returns:
602            Dictionary containing the configured catalog, or `None` if not found.
603        """
604        return self.dump_raw_catalog()
605
606    def dump_raw_catalog(self) -> dict[str, Any] | None:
607        """Dump the full configured catalog (syncCatalog) for this connection.
608
609        Returns the raw catalog dict as returned by the API, including stream
610        schemas, sync modes, cursor fields, and primary keys.
611
612        The returned dict can be passed to `import_raw_catalog()` on this or
613        another connection to restore or clone the catalog configuration.
614
615        Returns:
616            Dictionary containing the configured catalog, or `None` if not found.
617        """
618        connection_response = api_util.get_connection_catalog(
619            connection_id=self.connection_id,
620            api_root=self.workspace.api_root,
621            client_id=self.workspace.client_id,
622            client_secret=self.workspace.client_secret,
623            bearer_token=self.workspace.bearer_token,
624        )
625        return connection_response.get("syncCatalog")
626
627    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
628        """Replace the configured catalog for this connection.
629
630        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
631        > could result in broken connections, and/or incorrect sync behavior.
632
633        Accepts a configured catalog dict and replaces the connection's entire
634        catalog with it. All other connection settings remain unchanged.
635
636        The catalog shape should match the output of `dump_raw_catalog()`:
637        `{"streams": [{"stream": {...}, "config": {...}}, ...]}`.
638
639        Args:
640            catalog: The configured catalog dict to set.
641        """
642        api_util.replace_connection_catalog(
643            connection_id=self.connection_id,
644            configured_catalog_dict=catalog,
645            api_root=self.workspace.api_root,
646            client_id=self.workspace.client_id,
647            client_secret=self.workspace.client_secret,
648            bearer_token=self.workspace.bearer_token,
649        )
650
651    def rename(self, name: str) -> CloudConnection:
652        """Rename the connection.
653
654        Args:
655            name: New name for the connection
656
657        Returns:
658            Updated CloudConnection object with refreshed info
659        """
660        updated_response = api_util.patch_connection(
661            connection_id=self.connection_id,
662            api_root=self.workspace.api_root,
663            client_id=self.workspace.client_id,
664            client_secret=self.workspace.client_secret,
665            bearer_token=self.workspace.bearer_token,
666            name=name,
667        )
668        self._connection_info = updated_response
669        return self
670
671    def set_table_prefix(self, prefix: str) -> CloudConnection:
672        """Set the table prefix for the connection.
673
674        Args:
675            prefix: New table prefix to use when syncing to the destination
676
677        Returns:
678            Updated CloudConnection object with refreshed info
679        """
680        updated_response = api_util.patch_connection(
681            connection_id=self.connection_id,
682            api_root=self.workspace.api_root,
683            client_id=self.workspace.client_id,
684            client_secret=self.workspace.client_secret,
685            bearer_token=self.workspace.bearer_token,
686            prefix=prefix,
687        )
688        self._connection_info = updated_response
689        return self
690
691    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
692        """Set the selected streams for the connection.
693
694        This is a destructive operation that can break existing connections if the
695        stream selection is changed incorrectly. Use with caution.
696
697        Args:
698            stream_names: List of stream names to sync
699
700        Returns:
701            Updated CloudConnection object with refreshed info
702        """
703        configurations = api_util.build_stream_configurations(stream_names)
704
705        updated_response = api_util.patch_connection(
706            connection_id=self.connection_id,
707            api_root=self.workspace.api_root,
708            client_id=self.workspace.client_id,
709            client_secret=self.workspace.client_secret,
710            bearer_token=self.workspace.bearer_token,
711            configurations=configurations,
712        )
713        self._connection_info = updated_response
714        return self
715
716    # Enable/Disable
717
718    @property
719    def enabled(self) -> bool:
720        """Get the current enabled status of the connection.
721
722        This property always fetches fresh data from the API to ensure accuracy,
723        as another process or user may have toggled the setting.
724
725        Returns:
726            True if the connection status is 'active', False otherwise.
727        """
728        connection_info = self._fetch_connection_info(force_refresh=True)
729        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
730
731    @enabled.setter
732    def enabled(self, value: bool) -> None:
733        """Set the enabled status of the connection.
734
735        Args:
736            value: True to enable (set status to 'active'), False to disable
737                (set status to 'inactive').
738        """
739        self.set_enabled(enabled=value)
740
741    def set_enabled(
742        self,
743        *,
744        enabled: bool,
745        ignore_noop: bool = True,
746    ) -> None:
747        """Set the enabled status of the connection.
748
749        Args:
750            enabled: True to enable (set status to 'active'), False to disable
751                (set status to 'inactive').
752            ignore_noop: If True (default), silently return if the connection is already
753                in the requested state. If False, raise ValueError when the requested
754                state matches the current state.
755
756        Raises:
757            ValueError: If ignore_noop is False and the connection is already in the
758                requested state.
759        """
760        # Always fetch fresh data to check current status
761        connection_info = self._fetch_connection_info(force_refresh=True)
762        current_status = connection_info.status
763        desired_status = (
764            api_util.models.ConnectionStatusEnum.ACTIVE
765            if enabled
766            else api_util.models.ConnectionStatusEnum.INACTIVE
767        )
768
769        if current_status == desired_status:
770            if ignore_noop:
771                return
772            raise ValueError(
773                f"Connection is already {'enabled' if enabled else 'disabled'}. "
774                f"Current status: {current_status}"
775            )
776
777        updated_response = api_util.patch_connection(
778            connection_id=self.connection_id,
779            api_root=self.workspace.api_root,
780            client_id=self.workspace.client_id,
781            client_secret=self.workspace.client_secret,
782            bearer_token=self.workspace.bearer_token,
783            status=desired_status,
784        )
785        self._connection_info = updated_response
786
787    # Scheduling
788
789    def set_schedule(
790        self,
791        cron_expression: str,
792    ) -> None:
793        """Set a cron schedule for the connection.
794
795        Args:
796            cron_expression: A cron expression defining when syncs should run.
797
798        Examples:
799                - "0 0 * * *" - Daily at midnight UTC
800                - "0 */6 * * *" - Every 6 hours
801                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
802        """
803        schedule = api_util.models.AirbyteAPIConnectionSchedule(
804            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
805            cron_expression=cron_expression,
806        )
807        updated_response = api_util.patch_connection(
808            connection_id=self.connection_id,
809            api_root=self.workspace.api_root,
810            client_id=self.workspace.client_id,
811            client_secret=self.workspace.client_secret,
812            bearer_token=self.workspace.bearer_token,
813            schedule=schedule,
814        )
815        self._connection_info = updated_response
816
817    def set_manual_schedule(self) -> None:
818        """Set the connection to manual scheduling.
819
820        Disables automatic syncs. Syncs will only run when manually triggered.
821        """
822        schedule = api_util.models.AirbyteAPIConnectionSchedule(
823            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
824        )
825        updated_response = api_util.patch_connection(
826            connection_id=self.connection_id,
827            api_root=self.workspace.api_root,
828            client_id=self.workspace.client_id,
829            client_secret=self.workspace.client_secret,
830            bearer_token=self.workspace.bearer_token,
831            schedule=schedule,
832        )
833        self._connection_info = updated_response
834
835    # Deletions
836
837    def permanently_delete(
838        self,
839        *,
840        cascade_delete_source: bool = False,
841        cascade_delete_destination: bool = False,
842    ) -> None:
843        """Delete the connection.
844
845        Args:
846            cascade_delete_source: Whether to also delete the source.
847            cascade_delete_destination: Whether to also delete the destination.
848        """
849        self.workspace.permanently_delete_connection(self)
850
851        if cascade_delete_source:
852            self.workspace.permanently_delete_source(self.source_id)
853
854        if cascade_delete_destination:
855            self.workspace.permanently_delete_destination(self.destination_id)
logger = <Logger airbyte.cloud.connections (INFO)>
class CloudConnection:
 32class CloudConnection:  # noqa: PLR0904  # Too many public methods
 33    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 34
 35    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 36    """
 37
 38    def __init__(
 39        self,
 40        workspace: CloudWorkspace,
 41        connection_id: str,
 42        source: str | None = None,
 43        destination: str | None = None,
 44    ) -> None:
 45        """It is not recommended to create a `CloudConnection` object directly.
 46
 47        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 48        """
 49        self.connection_id = connection_id
 50        """The ID of the connection."""
 51
 52        self.workspace = workspace
 53        """The workspace that the connection belongs to."""
 54
 55        self._source_id = source
 56        """The ID of the source."""
 57
 58        self._destination_id = destination
 59        """The ID of the destination."""
 60
 61        self._connection_info: ConnectionResponse | None = None
 62        """The connection info object. (Cached.)"""
 63
 64        self._cloud_source_object: CloudSource | None = None
 65        """The source object. (Cached.)"""
 66
 67        self._cloud_destination_object: CloudDestination | None = None
 68        """The destination object. (Cached.)"""
 69
 70    def _fetch_connection_info(
 71        self,
 72        *,
 73        force_refresh: bool = False,
 74        verify: bool = True,
 75    ) -> ConnectionResponse:
 76        """Fetch and cache connection info from the API.
 77
 78        By default, this method will only fetch from the API if connection info is not
 79        already cached. It also verifies that the connection belongs to the expected
 80        workspace unless verification is explicitly disabled.
 81
 82        Args:
 83            force_refresh: If True, always fetch from the API even if cached.
 84                If False (default), only fetch if not already cached.
 85            verify: If True (default), verify that the connection is valid (e.g., that
 86                the workspace_id matches this object's workspace). Raises an error if
 87                validation fails.
 88
 89        Returns:
 90            The ConnectionResponse from the API.
 91
 92        Raises:
 93            AirbyteWorkspaceMismatchError: If verify is True and the connection's
 94                workspace_id doesn't match the expected workspace.
 95            AirbyteMissingResourceError: If the connection doesn't exist.
 96        """
 97        if not force_refresh and self._connection_info is not None:
 98            # Use cached info, but still verify if requested
 99            if verify:
100                self._verify_workspace_match(self._connection_info)
101            return self._connection_info
102
103        # Fetch from API
104        connection_info = api_util.get_connection(
105            workspace_id=self.workspace.workspace_id,
106            connection_id=self.connection_id,
107            api_root=self.workspace.api_root,
108            client_id=self.workspace.client_id,
109            client_secret=self.workspace.client_secret,
110            bearer_token=self.workspace.bearer_token,
111        )
112
113        # Cache the result first (before verification may raise)
114        self._connection_info = connection_info
115
116        # Verify if requested
117        if verify:
118            self._verify_workspace_match(connection_info)
119
120        return connection_info
121
122    def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None:
123        """Verify that the connection belongs to the expected workspace.
124
125        Raises:
126            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
127        """
128        if connection_info.workspace_id != self.workspace.workspace_id:
129            raise AirbyteWorkspaceMismatchError(
130                resource_type="connection",
131                resource_id=self.connection_id,
132                workspace=self.workspace,
133                expected_workspace_id=self.workspace.workspace_id,
134                actual_workspace_id=connection_info.workspace_id,
135                message=(
136                    f"Connection '{self.connection_id}' belongs to workspace "
137                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
138                ),
139            )
140
141    def check_is_valid(self) -> bool:
142        """Check if this connection exists and belongs to the expected workspace.
143
144        This method fetches connection info from the API (if not already cached) and
145        verifies that the connection's workspace_id matches the workspace associated
146        with this CloudConnection object.
147
148        Returns:
149            True if the connection exists and belongs to the expected workspace.
150
151        Raises:
152            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
153            AirbyteMissingResourceError: If the connection doesn't exist.
154        """
155        self._fetch_connection_info(force_refresh=False, verify=True)
156        return True
157
158    @classmethod
159    def _from_connection_response(
160        cls,
161        workspace: CloudWorkspace,
162        connection_response: ConnectionResponse,
163    ) -> CloudConnection:
164        """Create a CloudConnection from a ConnectionResponse."""
165        result = cls(
166            workspace=workspace,
167            connection_id=connection_response.connection_id,
168            source=connection_response.source_id,
169            destination=connection_response.destination_id,
170        )
171        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
172        return result
173
174    # Properties
175
176    @property
177    def name(self) -> str | None:
178        """Get the display name of the connection, if available.
179
180        E.g. "My Postgres to Snowflake", not the connection ID.
181        """
182        if not self._connection_info:
183            self._connection_info = self._fetch_connection_info()
184
185        return self._connection_info.name
186
187    @property
188    def source_id(self) -> str:
189        """The ID of the source."""
190        if not self._source_id:
191            if not self._connection_info:
192                self._connection_info = self._fetch_connection_info()
193
194            self._source_id = self._connection_info.source_id
195
196        return self._source_id
197
198    @property
199    def source(self) -> CloudSource:
200        """Get the source object."""
201        if self._cloud_source_object:
202            return self._cloud_source_object
203
204        self._cloud_source_object = CloudSource(
205            workspace=self.workspace,
206            connector_id=self.source_id,
207        )
208        return self._cloud_source_object
209
210    @property
211    def destination_id(self) -> str:
212        """The ID of the destination."""
213        if not self._destination_id:
214            if not self._connection_info:
215                self._connection_info = self._fetch_connection_info()
216
217            self._destination_id = self._connection_info.destination_id
218
219        return self._destination_id
220
221    @property
222    def destination(self) -> CloudDestination:
223        """Get the destination object."""
224        if self._cloud_destination_object:
225            return self._cloud_destination_object
226
227        self._cloud_destination_object = CloudDestination(
228            workspace=self.workspace,
229            connector_id=self.destination_id,
230        )
231        return self._cloud_destination_object
232
233    @property
234    def stream_names(self) -> list[str]:
235        """The stream names."""
236        if not self._connection_info:
237            self._connection_info = self._fetch_connection_info()
238
239        return [stream.name for stream in self._connection_info.configurations.streams or []]
240
241    @property
242    def table_prefix(self) -> str:
243        """The table prefix."""
244        if not self._connection_info:
245            self._connection_info = self._fetch_connection_info()
246
247        return self._connection_info.prefix or ""
248
249    @property
250    def connection_url(self) -> str | None:
251        """The web URL to the connection."""
252        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
253
254    @property
255    def job_history_url(self) -> str | None:
256        """The URL to the job history for the connection."""
257        return f"{self.connection_url}/timeline"
258
259    # Run Sync
260
261    def run_sync(
262        self,
263        *,
264        wait: bool = True,
265        wait_timeout: int = 300,
266    ) -> SyncResult:
267        """Run a sync."""
268        connection_response = api_util.run_connection(
269            connection_id=self.connection_id,
270            api_root=self.workspace.api_root,
271            workspace_id=self.workspace.workspace_id,
272            client_id=self.workspace.client_id,
273            client_secret=self.workspace.client_secret,
274            bearer_token=self.workspace.bearer_token,
275        )
276        sync_result = SyncResult(
277            workspace=self.workspace,
278            connection=self,
279            job_id=connection_response.job_id,
280        )
281
282        if wait:
283            sync_result.wait_for_completion(
284                wait_timeout=wait_timeout,
285                raise_failure=True,
286                raise_timeout=True,
287            )
288
289        return sync_result
290
291    def __repr__(self) -> str:
292        """String representation of the connection."""
293        return (
294            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
295            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
296        )
297
298    # Logs
299
300    def get_previous_sync_logs(
301        self,
302        *,
303        limit: int = 20,
304        offset: int | None = None,
305        from_tail: bool = True,
306        job_type: JobTypeEnum | None = None,
307    ) -> list[SyncResult]:
308        """Get previous sync jobs for a connection with pagination support.
309
310        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
311        rows_synced, start_time). Full log text can be fetched lazily via
312        `SyncResult.get_full_log_text()`.
313
314        Args:
315            limit: Maximum number of jobs to return. Defaults to 20.
316            offset: Number of jobs to skip from the beginning. Defaults to None (0).
317            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
318                If False, returns jobs ordered oldest-first (createdAt ASC).
319                Defaults to True.
320            job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH).
321                If not specified, defaults to sync and reset jobs only (API default behavior).
322
323        Returns:
324            A list of SyncResult objects representing the sync jobs.
325        """
326        order_by = (
327            api_util.JOB_ORDER_BY_CREATED_AT_DESC
328            if from_tail
329            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
330        )
331        sync_logs: list[JobResponse] = api_util.get_job_logs(
332            connection_id=self.connection_id,
333            api_root=self.workspace.api_root,
334            workspace_id=self.workspace.workspace_id,
335            limit=limit,
336            offset=offset,
337            order_by=order_by,
338            job_type=job_type,
339            client_id=self.workspace.client_id,
340            client_secret=self.workspace.client_secret,
341            bearer_token=self.workspace.bearer_token,
342        )
343        return [
344            SyncResult(
345                workspace=self.workspace,
346                connection=self,
347                job_id=sync_log.job_id,
348                _latest_job_info=sync_log,
349            )
350            for sync_log in sync_logs
351        ]
352
353    def get_sync_result(
354        self,
355        job_id: int | None = None,
356    ) -> SyncResult | None:
357        """Get the sync result for the connection.
358
359        If `job_id` is not provided, the most recent sync job will be used.
360
361        Returns `None` if job_id is omitted and no previous jobs are found.
362        """
363        if job_id is None:
364            # Get the most recent sync job
365            results = self.get_previous_sync_logs(
366                limit=1,
367            )
368            if results:
369                return results[0]
370
371            return None
372
373        # Get the sync job by ID (lazy loaded)
374        return SyncResult(
375            workspace=self.workspace,
376            connection=self,
377            job_id=job_id,
378        )
379
380    # Artifacts
381
382    @deprecated("Use 'dump_raw_state()' instead.")
383    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
384        """Deprecated. Use `dump_raw_state()` instead."""
385        state_response = api_util.get_connection_state(
386            connection_id=self.connection_id,
387            api_root=self.workspace.api_root,
388            client_id=self.workspace.client_id,
389            client_secret=self.workspace.client_secret,
390            bearer_token=self.workspace.bearer_token,
391        )
392        if state_response.get("stateType") == "not_set":
393            return None
394        return state_response.get("streamState", [])
395
396    def dump_raw_state(self) -> dict[str, Any]:
397        """Dump the full raw state for this connection.
398
399        Returns the connection's sync state as a raw dictionary from the API.
400        The result includes stateType, connectionId, and all state data.
401
402        The output of this method can be passed directly to `import_raw_state()`
403        on the same or a different connection (connectionId is overridden on import).
404
405        Returns:
406            The full connection state as a dictionary.
407        """
408        return api_util.get_connection_state(
409            connection_id=self.connection_id,
410            api_root=self.workspace.api_root,
411            client_id=self.workspace.client_id,
412            client_secret=self.workspace.client_secret,
413            bearer_token=self.workspace.bearer_token,
414        )
415
416    def import_raw_state(
417        self,
418        connection_state_dict: dict[str, Any],
419    ) -> dict[str, Any]:
420        """Import (restore) the full raw state for this connection.
421
422        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
423        > could result in broken connections, and/or incorrect sync behavior.
424
425        Replaces the entire connection state with the provided state blob.
426        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
427
428        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
429        The `connectionId` in the blob is always overridden with this connection's
430        ID, making state blobs portable across connections.
431
432        Args:
433            connection_state_dict: The full connection state dict to import. Must include:
434                - stateType: "global", "stream", or "legacy"
435                - One of: state (legacy), streamState (stream), globalState (global)
436
437        Returns:
438            The updated connection state as a dictionary.
439
440        Raises:
441            AirbyteConnectionSyncActiveError: If a sync is currently running on this
442                connection (HTTP 423). Wait for the sync to complete before retrying.
443        """
444        return api_util.replace_connection_state(
445            connection_id=self.connection_id,
446            connection_state_dict=connection_state_dict,
447            api_root=self.workspace.api_root,
448            client_id=self.workspace.client_id,
449            client_secret=self.workspace.client_secret,
450            bearer_token=self.workspace.bearer_token,
451        )
452
453    def get_stream_state(
454        self,
455        stream_name: str,
456        stream_namespace: str | None = None,
457    ) -> dict[str, Any] | None:
458        """Get the state blob for a single stream within this connection.
459
460        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
461        not the full connection state envelope.
462
463        This is compatible with `stream`-type state and stream-level entries
464        within a `global`-type state. It is not compatible with `legacy` state.
465        To get or set the entire connection-level state artifact, use
466        `dump_raw_state` and `import_raw_state` instead.
467
468        Args:
469            stream_name: The name of the stream to get state for.
470            stream_namespace: The source-side stream namespace. This refers to the
471                namespace from the source (e.g., database schema), not any destination
472                namespace override set in connection advanced settings.
473
474        Returns:
475            The stream's state blob as a dictionary, or None if the stream is not found.
476        """
477        state_data = self.dump_raw_state()
478        result = ConnectionStateResponse(**state_data)
479
480        streams = _get_stream_list(result)
481        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
482
483        if not matching:
484            available = [s.stream_descriptor.name for s in streams]
485            logger.warning(
486                "Stream '%s' not found in connection state for connection '%s'. "
487                "Available streams: %s",
488                stream_name,
489                self.connection_id,
490                available,
491            )
492            return None
493
494        return matching[0].stream_state
495
496    def set_stream_state(
497        self,
498        stream_name: str,
499        state_blob_dict: dict[str, Any],
500        stream_namespace: str | None = None,
501    ) -> None:
502        """Set the state for a single stream within this connection.
503
504        Fetches the current full state, replaces only the specified stream's state,
505        then sends the full updated state back to the API. If the stream does not
506        exist in the current state, it is appended.
507
508        This is compatible with `stream`-type state and stream-level entries
509        within a `global`-type state. It is not compatible with `legacy` state.
510        To get or set the entire connection-level state artifact, use
511        `dump_raw_state` and `import_raw_state` instead.
512
513        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
514
515        Args:
516            stream_name: The name of the stream to update state for.
517            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
518            stream_namespace: The source-side stream namespace. This refers to the
519                namespace from the source (e.g., database schema), not any destination
520                namespace override set in connection advanced settings.
521
522        Raises:
523            PyAirbyteInputError: If the connection state type is not supported for
524                stream-level operations (not_set, legacy).
525            AirbyteConnectionSyncActiveError: If a sync is currently running on this
526                connection (HTTP 423). Wait for the sync to complete before retrying.
527        """
528        state_data = self.dump_raw_state()
529        current = ConnectionStateResponse(**state_data)
530
531        if current.state_type == "not_set":
532            raise PyAirbyteInputError(
533                message="Cannot set stream state: connection has no existing state.",
534                context={"connection_id": self.connection_id},
535            )
536
537        if current.state_type == "legacy":
538            raise PyAirbyteInputError(
539                message="Cannot set stream state on a legacy-type connection state.",
540                context={"connection_id": self.connection_id},
541            )
542
543        new_stream_entry = {
544            "streamDescriptor": {
545                "name": stream_name,
546                **(
547                    {
548                        "namespace": stream_namespace,
549                    }
550                    if stream_namespace
551                    else {}
552                ),
553            },
554            "streamState": state_blob_dict,
555        }
556
557        raw_streams: list[dict[str, Any]]
558        if current.state_type == "stream":
559            raw_streams = state_data.get("streamState", [])
560        elif current.state_type == "global":
561            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
562        else:
563            raw_streams = []
564
565        streams = _get_stream_list(current)
566        found = False
567        updated_streams_raw: list[dict[str, Any]] = []
568        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
569            if _match_stream(parsed_s, stream_name, stream_namespace):
570                updated_streams_raw.append(new_stream_entry)
571                found = True
572            else:
573                updated_streams_raw.append(raw_s)
574
575        if not found:
576            updated_streams_raw.append(new_stream_entry)
577
578        full_state: dict[str, Any] = {
579            **state_data,
580        }
581
582        if current.state_type == "stream":
583            full_state["streamState"] = updated_streams_raw
584        elif current.state_type == "global":
585            original_global = state_data.get("globalState", {})
586            full_state["globalState"] = {
587                **original_global,
588                "streamStates": updated_streams_raw,
589            }
590
591        self.import_raw_state(full_state)
592
593    @deprecated("Use 'dump_raw_catalog()' instead.")
594    def get_catalog_artifact(self) -> dict[str, Any] | None:
595        """Get the configured catalog for this connection.
596
597        Returns the full configured catalog (syncCatalog) for this connection,
598        including stream schemas, sync modes, cursor fields, and primary keys.
599
600        Uses the Config API endpoint: POST /v1/web_backend/connections/get
601
602        Returns:
603            Dictionary containing the configured catalog, or `None` if not found.
604        """
605        return self.dump_raw_catalog()
606
607    def dump_raw_catalog(self) -> dict[str, Any] | None:
608        """Dump the full configured catalog (syncCatalog) for this connection.
609
610        Returns the raw catalog dict as returned by the API, including stream
611        schemas, sync modes, cursor fields, and primary keys.
612
613        The returned dict can be passed to `import_raw_catalog()` on this or
614        another connection to restore or clone the catalog configuration.
615
616        Returns:
617            Dictionary containing the configured catalog, or `None` if not found.
618        """
619        connection_response = api_util.get_connection_catalog(
620            connection_id=self.connection_id,
621            api_root=self.workspace.api_root,
622            client_id=self.workspace.client_id,
623            client_secret=self.workspace.client_secret,
624            bearer_token=self.workspace.bearer_token,
625        )
626        return connection_response.get("syncCatalog")
627
628    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
629        """Replace the configured catalog for this connection.
630
631        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
632        > could result in broken connections, and/or incorrect sync behavior.
633
634        Accepts a configured catalog dict and replaces the connection's entire
635        catalog with it. All other connection settings remain unchanged.
636
637        The catalog shape should match the output of `dump_raw_catalog()`:
638        `{"streams": [{"stream": {...}, "config": {...}}, ...]}`.
639
640        Args:
641            catalog: The configured catalog dict to set.
642        """
643        api_util.replace_connection_catalog(
644            connection_id=self.connection_id,
645            configured_catalog_dict=catalog,
646            api_root=self.workspace.api_root,
647            client_id=self.workspace.client_id,
648            client_secret=self.workspace.client_secret,
649            bearer_token=self.workspace.bearer_token,
650        )
651
652    def rename(self, name: str) -> CloudConnection:
653        """Rename the connection.
654
655        Args:
656            name: New name for the connection
657
658        Returns:
659            Updated CloudConnection object with refreshed info
660        """
661        updated_response = api_util.patch_connection(
662            connection_id=self.connection_id,
663            api_root=self.workspace.api_root,
664            client_id=self.workspace.client_id,
665            client_secret=self.workspace.client_secret,
666            bearer_token=self.workspace.bearer_token,
667            name=name,
668        )
669        self._connection_info = updated_response
670        return self
671
672    def set_table_prefix(self, prefix: str) -> CloudConnection:
673        """Set the table prefix for the connection.
674
675        Args:
676            prefix: New table prefix to use when syncing to the destination
677
678        Returns:
679            Updated CloudConnection object with refreshed info
680        """
681        updated_response = api_util.patch_connection(
682            connection_id=self.connection_id,
683            api_root=self.workspace.api_root,
684            client_id=self.workspace.client_id,
685            client_secret=self.workspace.client_secret,
686            bearer_token=self.workspace.bearer_token,
687            prefix=prefix,
688        )
689        self._connection_info = updated_response
690        return self
691
692    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
693        """Set the selected streams for the connection.
694
695        This is a destructive operation that can break existing connections if the
696        stream selection is changed incorrectly. Use with caution.
697
698        Args:
699            stream_names: List of stream names to sync
700
701        Returns:
702            Updated CloudConnection object with refreshed info
703        """
704        configurations = api_util.build_stream_configurations(stream_names)
705
706        updated_response = api_util.patch_connection(
707            connection_id=self.connection_id,
708            api_root=self.workspace.api_root,
709            client_id=self.workspace.client_id,
710            client_secret=self.workspace.client_secret,
711            bearer_token=self.workspace.bearer_token,
712            configurations=configurations,
713        )
714        self._connection_info = updated_response
715        return self
716
717    # Enable/Disable
718
719    @property
720    def enabled(self) -> bool:
721        """Get the current enabled status of the connection.
722
723        This property always fetches fresh data from the API to ensure accuracy,
724        as another process or user may have toggled the setting.
725
726        Returns:
727            True if the connection status is 'active', False otherwise.
728        """
729        connection_info = self._fetch_connection_info(force_refresh=True)
730        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
731
732    @enabled.setter
733    def enabled(self, value: bool) -> None:
734        """Set the enabled status of the connection.
735
736        Args:
737            value: True to enable (set status to 'active'), False to disable
738                (set status to 'inactive').
739        """
740        self.set_enabled(enabled=value)
741
742    def set_enabled(
743        self,
744        *,
745        enabled: bool,
746        ignore_noop: bool = True,
747    ) -> None:
748        """Set the enabled status of the connection.
749
750        Args:
751            enabled: True to enable (set status to 'active'), False to disable
752                (set status to 'inactive').
753            ignore_noop: If True (default), silently return if the connection is already
754                in the requested state. If False, raise ValueError when the requested
755                state matches the current state.
756
757        Raises:
758            ValueError: If ignore_noop is False and the connection is already in the
759                requested state.
760        """
761        # Always fetch fresh data to check current status
762        connection_info = self._fetch_connection_info(force_refresh=True)
763        current_status = connection_info.status
764        desired_status = (
765            api_util.models.ConnectionStatusEnum.ACTIVE
766            if enabled
767            else api_util.models.ConnectionStatusEnum.INACTIVE
768        )
769
770        if current_status == desired_status:
771            if ignore_noop:
772                return
773            raise ValueError(
774                f"Connection is already {'enabled' if enabled else 'disabled'}. "
775                f"Current status: {current_status}"
776            )
777
778        updated_response = api_util.patch_connection(
779            connection_id=self.connection_id,
780            api_root=self.workspace.api_root,
781            client_id=self.workspace.client_id,
782            client_secret=self.workspace.client_secret,
783            bearer_token=self.workspace.bearer_token,
784            status=desired_status,
785        )
786        self._connection_info = updated_response
787
788    # Scheduling
789
790    def set_schedule(
791        self,
792        cron_expression: str,
793    ) -> None:
794        """Set a cron schedule for the connection.
795
796        Args:
797            cron_expression: A cron expression defining when syncs should run.
798
799        Examples:
800                - "0 0 * * *" - Daily at midnight UTC
801                - "0 */6 * * *" - Every 6 hours
802                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
803        """
804        schedule = api_util.models.AirbyteAPIConnectionSchedule(
805            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
806            cron_expression=cron_expression,
807        )
808        updated_response = api_util.patch_connection(
809            connection_id=self.connection_id,
810            api_root=self.workspace.api_root,
811            client_id=self.workspace.client_id,
812            client_secret=self.workspace.client_secret,
813            bearer_token=self.workspace.bearer_token,
814            schedule=schedule,
815        )
816        self._connection_info = updated_response
817
818    def set_manual_schedule(self) -> None:
819        """Set the connection to manual scheduling.
820
821        Disables automatic syncs. Syncs will only run when manually triggered.
822        """
823        schedule = api_util.models.AirbyteAPIConnectionSchedule(
824            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
825        )
826        updated_response = api_util.patch_connection(
827            connection_id=self.connection_id,
828            api_root=self.workspace.api_root,
829            client_id=self.workspace.client_id,
830            client_secret=self.workspace.client_secret,
831            bearer_token=self.workspace.bearer_token,
832            schedule=schedule,
833        )
834        self._connection_info = updated_response
835
836    # Deletions
837
838    def permanently_delete(
839        self,
840        *,
841        cascade_delete_source: bool = False,
842        cascade_delete_destination: bool = False,
843    ) -> None:
844        """Delete the connection.
845
846        Args:
847            cascade_delete_source: Whether to also delete the source.
848            cascade_delete_destination: Whether to also delete the destination.
849        """
850        self.workspace.permanently_delete_connection(self)
851
852        if cascade_delete_source:
853            self.workspace.permanently_delete_source(self.source_id)
854
855        if cascade_delete_destination:
856            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: airbyte.cloud.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
38    def __init__(
39        self,
40        workspace: CloudWorkspace,
41        connection_id: str,
42        source: str | None = None,
43        destination: str | None = None,
44    ) -> None:
45        """It is not recommended to create a `CloudConnection` object directly.
46
47        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
48        """
49        self.connection_id = connection_id
50        """The ID of the connection."""
51
52        self.workspace = workspace
53        """The workspace that the connection belongs to."""
54
55        self._source_id = source
56        """The ID of the source."""
57
58        self._destination_id = destination
59        """The ID of the destination."""
60
61        self._connection_info: ConnectionResponse | None = None
62        """The connection info object. (Cached.)"""
63
64        self._cloud_source_object: CloudSource | None = None
65        """The source object. (Cached.)"""
66
67        self._cloud_destination_object: CloudDestination | None = None
68        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

def check_is_valid(self) -> bool:
141    def check_is_valid(self) -> bool:
142        """Check if this connection exists and belongs to the expected workspace.
143
144        This method fetches connection info from the API (if not already cached) and
145        verifies that the connection's workspace_id matches the workspace associated
146        with this CloudConnection object.
147
148        Returns:
149            True if the connection exists and belongs to the expected workspace.
150
151        Raises:
152            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
153            AirbyteMissingResourceError: If the connection doesn't exist.
154        """
155        self._fetch_connection_info(force_refresh=False, verify=True)
156        return True

Check if this connection exists and belongs to the expected workspace.

This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.

Returns:

True if the connection exists and belongs to the expected workspace.

Raises:
  • AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
  • AirbyteMissingResourceError: If the connection doesn't exist.
name: str | None
176    @property
177    def name(self) -> str | None:
178        """Get the display name of the connection, if available.
179
180        E.g. "My Postgres to Snowflake", not the connection ID.
181        """
182        if not self._connection_info:
183            self._connection_info = self._fetch_connection_info()
184
185        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
187    @property
188    def source_id(self) -> str:
189        """The ID of the source."""
190        if not self._source_id:
191            if not self._connection_info:
192                self._connection_info = self._fetch_connection_info()
193
194            self._source_id = self._connection_info.source_id
195
196        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
198    @property
199    def source(self) -> CloudSource:
200        """Get the source object."""
201        if self._cloud_source_object:
202            return self._cloud_source_object
203
204        self._cloud_source_object = CloudSource(
205            workspace=self.workspace,
206            connector_id=self.source_id,
207        )
208        return self._cloud_source_object

Get the source object.

destination_id: str
210    @property
211    def destination_id(self) -> str:
212        """The ID of the destination."""
213        if not self._destination_id:
214            if not self._connection_info:
215                self._connection_info = self._fetch_connection_info()
216
217            self._destination_id = self._connection_info.destination_id
218
219        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
221    @property
222    def destination(self) -> CloudDestination:
223        """Get the destination object."""
224        if self._cloud_destination_object:
225            return self._cloud_destination_object
226
227        self._cloud_destination_object = CloudDestination(
228            workspace=self.workspace,
229            connector_id=self.destination_id,
230        )
231        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
233    @property
234    def stream_names(self) -> list[str]:
235        """The stream names."""
236        if not self._connection_info:
237            self._connection_info = self._fetch_connection_info()
238
239        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
241    @property
242    def table_prefix(self) -> str:
243        """The table prefix."""
244        if not self._connection_info:
245            self._connection_info = self._fetch_connection_info()
246
247        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
249    @property
250    def connection_url(self) -> str | None:
251        """The web URL to the connection."""
252        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
254    @property
255    def job_history_url(self) -> str | None:
256        """The URL to the job history for the connection."""
257        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.SyncResult:
261    def run_sync(
262        self,
263        *,
264        wait: bool = True,
265        wait_timeout: int = 300,
266    ) -> SyncResult:
267        """Run a sync."""
268        connection_response = api_util.run_connection(
269            connection_id=self.connection_id,
270            api_root=self.workspace.api_root,
271            workspace_id=self.workspace.workspace_id,
272            client_id=self.workspace.client_id,
273            client_secret=self.workspace.client_secret,
274            bearer_token=self.workspace.bearer_token,
275        )
276        sync_result = SyncResult(
277            workspace=self.workspace,
278            connection=self,
279            job_id=connection_response.job_id,
280        )
281
282        if wait:
283            sync_result.wait_for_completion(
284                wait_timeout=wait_timeout,
285                raise_failure=True,
286                raise_timeout=True,
287            )
288
289        return sync_result

Run a sync.

def get_previous_sync_logs( self, *, limit: int = 20, offset: int | None = None, from_tail: bool = True, job_type: airbyte_api.models.jobtypeenum.JobTypeEnum | None = None) -> list[airbyte.cloud.SyncResult]:
300    def get_previous_sync_logs(
301        self,
302        *,
303        limit: int = 20,
304        offset: int | None = None,
305        from_tail: bool = True,
306        job_type: JobTypeEnum | None = None,
307    ) -> list[SyncResult]:
308        """Get previous sync jobs for a connection with pagination support.
309
310        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
311        rows_synced, start_time). Full log text can be fetched lazily via
312        `SyncResult.get_full_log_text()`.
313
314        Args:
315            limit: Maximum number of jobs to return. Defaults to 20.
316            offset: Number of jobs to skip from the beginning. Defaults to None (0).
317            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
318                If False, returns jobs ordered oldest-first (createdAt ASC).
319                Defaults to True.
320            job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH).
321                If not specified, defaults to sync and reset jobs only (API default behavior).
322
323        Returns:
324            A list of SyncResult objects representing the sync jobs.
325        """
326        order_by = (
327            api_util.JOB_ORDER_BY_CREATED_AT_DESC
328            if from_tail
329            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
330        )
331        sync_logs: list[JobResponse] = api_util.get_job_logs(
332            connection_id=self.connection_id,
333            api_root=self.workspace.api_root,
334            workspace_id=self.workspace.workspace_id,
335            limit=limit,
336            offset=offset,
337            order_by=order_by,
338            job_type=job_type,
339            client_id=self.workspace.client_id,
340            client_secret=self.workspace.client_secret,
341            bearer_token=self.workspace.bearer_token,
342        )
343        return [
344            SyncResult(
345                workspace=self.workspace,
346                connection=self,
347                job_id=sync_log.job_id,
348                _latest_job_info=sync_log,
349            )
350            for sync_log in sync_logs
351        ]

Get previous sync jobs for a connection with pagination support.

Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, rows_synced, start_time). Full log text can be fetched lazily via SyncResult.get_full_log_text().

Arguments:
  • limit: Maximum number of jobs to return. Defaults to 20.
  • offset: Number of jobs to skip from the beginning. Defaults to None (0).
  • from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
  • job_type: Filter by job type (e.g., JobTypeEnum.SYNC, JobTypeEnum.REFRESH). If not specified, defaults to sync and reset jobs only (API default behavior).
Returns:

A list of SyncResult objects representing the sync jobs.

def get_sync_result( self, job_id: int | None = None) -> airbyte.cloud.SyncResult | None:
353    def get_sync_result(
354        self,
355        job_id: int | None = None,
356    ) -> SyncResult | None:
357        """Get the sync result for the connection.
358
359        If `job_id` is not provided, the most recent sync job will be used.
360
361        Returns `None` if job_id is omitted and no previous jobs are found.
362        """
363        if job_id is None:
364            # Get the most recent sync job
365            results = self.get_previous_sync_logs(
366                limit=1,
367            )
368            if results:
369                return results[0]
370
371            return None
372
373        # Get the sync job by ID (lazy loaded)
374        return SyncResult(
375            workspace=self.workspace,
376            connection=self,
377            job_id=job_id,
378        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

@deprecated("Use 'dump_raw_state()' instead.")
def get_state_artifacts(self) -> list[dict[str, typing.Any]] | None:
382    @deprecated("Use 'dump_raw_state()' instead.")
383    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
384        """Deprecated. Use `dump_raw_state()` instead."""
385        state_response = api_util.get_connection_state(
386            connection_id=self.connection_id,
387            api_root=self.workspace.api_root,
388            client_id=self.workspace.client_id,
389            client_secret=self.workspace.client_secret,
390            bearer_token=self.workspace.bearer_token,
391        )
392        if state_response.get("stateType") == "not_set":
393            return None
394        return state_response.get("streamState", [])

Deprecated. Use dump_raw_state() instead.

def dump_raw_state(self) -> dict[str, typing.Any]:
396    def dump_raw_state(self) -> dict[str, Any]:
397        """Dump the full raw state for this connection.
398
399        Returns the connection's sync state as a raw dictionary from the API.
400        The result includes stateType, connectionId, and all state data.
401
402        The output of this method can be passed directly to `import_raw_state()`
403        on the same or a different connection (connectionId is overridden on import).
404
405        Returns:
406            The full connection state as a dictionary.
407        """
408        return api_util.get_connection_state(
409            connection_id=self.connection_id,
410            api_root=self.workspace.api_root,
411            client_id=self.workspace.client_id,
412            client_secret=self.workspace.client_secret,
413            bearer_token=self.workspace.bearer_token,
414        )

Dump the full raw state for this connection.

Returns the connection's sync state as a raw dictionary from the API. The result includes stateType, connectionId, and all state data.

The output of this method can be passed directly to import_raw_state() on the same or a different connection (connectionId is overridden on import).

Returns:

The full connection state as a dictionary.

def import_raw_state( self, connection_state_dict: dict[str, typing.Any]) -> dict[str, typing.Any]:
416    def import_raw_state(
417        self,
418        connection_state_dict: dict[str, Any],
419    ) -> dict[str, Any]:
420        """Import (restore) the full raw state for this connection.
421
422        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
423        > could result in broken connections, and/or incorrect sync behavior.
424
425        Replaces the entire connection state with the provided state blob.
426        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
427
428        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
429        The `connectionId` in the blob is always overridden with this connection's
430        ID, making state blobs portable across connections.
431
432        Args:
433            connection_state_dict: The full connection state dict to import. Must include:
434                - stateType: "global", "stream", or "legacy"
435                - One of: state (legacy), streamState (stream), globalState (global)
436
437        Returns:
438            The updated connection state as a dictionary.
439
440        Raises:
441            AirbyteConnectionSyncActiveError: If a sync is currently running on this
442                connection (HTTP 423). Wait for the sync to complete before retrying.
443        """
444        return api_util.replace_connection_state(
445            connection_id=self.connection_id,
446            connection_state_dict=connection_state_dict,
447            api_root=self.workspace.api_root,
448            client_id=self.workspace.client_id,
449            client_secret=self.workspace.client_secret,
450            bearer_token=self.workspace.bearer_token,
451        )

Import (restore) the full raw state for this connection.

⚠️ WARNING: Modifying the state directly is not recommended and could result in broken connections, and/or incorrect sync behavior.

Replaces the entire connection state with the provided state blob. Uses the safe variant that prevents updates while a sync is running (HTTP 423).

This is the counterpart to dump_raw_state() for backup/restore workflows. The connectionId in the blob is always overridden with this connection's ID, making state blobs portable across connections.

Arguments:
  • connection_state_dict: The full connection state dict to import. Must include:
    • stateType: "global", "stream", or "legacy"
    • One of: state (legacy), streamState (stream), globalState (global)
Returns:

The updated connection state as a dictionary.

Raises:
  • AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
def get_stream_state( self, stream_name: str, stream_namespace: str | None = None) -> dict[str, typing.Any] | None:
453    def get_stream_state(
454        self,
455        stream_name: str,
456        stream_namespace: str | None = None,
457    ) -> dict[str, Any] | None:
458        """Get the state blob for a single stream within this connection.
459
460        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
461        not the full connection state envelope.
462
463        This is compatible with `stream`-type state and stream-level entries
464        within a `global`-type state. It is not compatible with `legacy` state.
465        To get or set the entire connection-level state artifact, use
466        `dump_raw_state` and `import_raw_state` instead.
467
468        Args:
469            stream_name: The name of the stream to get state for.
470            stream_namespace: The source-side stream namespace. This refers to the
471                namespace from the source (e.g., database schema), not any destination
472                namespace override set in connection advanced settings.
473
474        Returns:
475            The stream's state blob as a dictionary, or None if the stream is not found.
476        """
477        state_data = self.dump_raw_state()
478        result = ConnectionStateResponse(**state_data)
479
480        streams = _get_stream_list(result)
481        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
482
483        if not matching:
484            available = [s.stream_descriptor.name for s in streams]
485            logger.warning(
486                "Stream '%s' not found in connection state for connection '%s'. "
487                "Available streams: %s",
488                stream_name,
489                self.connection_id,
490                available,
491            )
492            return None
493
494        return matching[0].stream_state

Get the state blob for a single stream within this connection.

Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), not the full connection state envelope.

This is compatible with stream-type state and stream-level entries within a global-type state. It is not compatible with legacy state. To get or set the entire connection-level state artifact, use dump_raw_state and import_raw_state instead.

Arguments:
  • stream_name: The name of the stream to get state for.
  • stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Returns:

The stream's state blob as a dictionary, or None if the stream is not found.

def set_stream_state( self, stream_name: str, state_blob_dict: dict[str, typing.Any], stream_namespace: str | None = None) -> None:
496    def set_stream_state(
497        self,
498        stream_name: str,
499        state_blob_dict: dict[str, Any],
500        stream_namespace: str | None = None,
501    ) -> None:
502        """Set the state for a single stream within this connection.
503
504        Fetches the current full state, replaces only the specified stream's state,
505        then sends the full updated state back to the API. If the stream does not
506        exist in the current state, it is appended.
507
508        This is compatible with `stream`-type state and stream-level entries
509        within a `global`-type state. It is not compatible with `legacy` state.
510        To get or set the entire connection-level state artifact, use
511        `dump_raw_state` and `import_raw_state` instead.
512
513        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
514
515        Args:
516            stream_name: The name of the stream to update state for.
517            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
518            stream_namespace: The source-side stream namespace. This refers to the
519                namespace from the source (e.g., database schema), not any destination
520                namespace override set in connection advanced settings.
521
522        Raises:
523            PyAirbyteInputError: If the connection state type is not supported for
524                stream-level operations (not_set, legacy).
525            AirbyteConnectionSyncActiveError: If a sync is currently running on this
526                connection (HTTP 423). Wait for the sync to complete before retrying.
527        """
528        state_data = self.dump_raw_state()
529        current = ConnectionStateResponse(**state_data)
530
531        if current.state_type == "not_set":
532            raise PyAirbyteInputError(
533                message="Cannot set stream state: connection has no existing state.",
534                context={"connection_id": self.connection_id},
535            )
536
537        if current.state_type == "legacy":
538            raise PyAirbyteInputError(
539                message="Cannot set stream state on a legacy-type connection state.",
540                context={"connection_id": self.connection_id},
541            )
542
543        new_stream_entry = {
544            "streamDescriptor": {
545                "name": stream_name,
546                **(
547                    {
548                        "namespace": stream_namespace,
549                    }
550                    if stream_namespace
551                    else {}
552                ),
553            },
554            "streamState": state_blob_dict,
555        }
556
557        raw_streams: list[dict[str, Any]]
558        if current.state_type == "stream":
559            raw_streams = state_data.get("streamState", [])
560        elif current.state_type == "global":
561            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
562        else:
563            raw_streams = []
564
565        streams = _get_stream_list(current)
566        found = False
567        updated_streams_raw: list[dict[str, Any]] = []
568        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
569            if _match_stream(parsed_s, stream_name, stream_namespace):
570                updated_streams_raw.append(new_stream_entry)
571                found = True
572            else:
573                updated_streams_raw.append(raw_s)
574
575        if not found:
576            updated_streams_raw.append(new_stream_entry)
577
578        full_state: dict[str, Any] = {
579            **state_data,
580        }
581
582        if current.state_type == "stream":
583            full_state["streamState"] = updated_streams_raw
584        elif current.state_type == "global":
585            original_global = state_data.get("globalState", {})
586            full_state["globalState"] = {
587                **original_global,
588                "streamStates": updated_streams_raw,
589            }
590
591        self.import_raw_state(full_state)

Set the state for a single stream within this connection.

Fetches the current full state, replaces only the specified stream's state, then sends the full updated state back to the API. If the stream does not exist in the current state, it is appended.

This is compatible with stream-type state and stream-level entries within a global-type state. It is not compatible with legacy state. To get or set the entire connection-level state artifact, use dump_raw_state and import_raw_state instead.

Uses the safe variant that prevents updates while a sync is running (HTTP 423).

Arguments:
  • stream_name: The name of the stream to update state for.
  • state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
  • stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Raises:
  • PyAirbyteInputError: If the connection state type is not supported for stream-level operations (not_set, legacy).
  • AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
@deprecated("Use 'dump_raw_catalog()' instead.")
def get_catalog_artifact(self) -> dict[str, typing.Any] | None:
593    @deprecated("Use 'dump_raw_catalog()' instead.")
594    def get_catalog_artifact(self) -> dict[str, Any] | None:
595        """Get the configured catalog for this connection.
596
597        Returns the full configured catalog (syncCatalog) for this connection,
598        including stream schemas, sync modes, cursor fields, and primary keys.
599
600        Uses the Config API endpoint: POST /v1/web_backend/connections/get
601
602        Returns:
603            Dictionary containing the configured catalog, or `None` if not found.
604        """
605        return self.dump_raw_catalog()

Get the configured catalog for this connection.

Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.

Uses the Config API endpoint: POST /v1/web_backend/connections/get

Returns:

Dictionary containing the configured catalog, or None if not found.

def dump_raw_catalog(self) -> dict[str, typing.Any] | None:
607    def dump_raw_catalog(self) -> dict[str, Any] | None:
608        """Dump the full configured catalog (syncCatalog) for this connection.
609
610        Returns the raw catalog dict as returned by the API, including stream
611        schemas, sync modes, cursor fields, and primary keys.
612
613        The returned dict can be passed to `import_raw_catalog()` on this or
614        another connection to restore or clone the catalog configuration.
615
616        Returns:
617            Dictionary containing the configured catalog, or `None` if not found.
618        """
619        connection_response = api_util.get_connection_catalog(
620            connection_id=self.connection_id,
621            api_root=self.workspace.api_root,
622            client_id=self.workspace.client_id,
623            client_secret=self.workspace.client_secret,
624            bearer_token=self.workspace.bearer_token,
625        )
626        return connection_response.get("syncCatalog")

Dump the full configured catalog (syncCatalog) for this connection.

Returns the raw catalog dict as returned by the API, including stream schemas, sync modes, cursor fields, and primary keys.

The returned dict can be passed to import_raw_catalog() on this or another connection to restore or clone the catalog configuration.

Returns:

Dictionary containing the configured catalog, or None if not found.

def import_raw_catalog(self, catalog: dict[str, typing.Any]) -> None:
628    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
629        """Replace the configured catalog for this connection.
630
631        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
632        > could result in broken connections, and/or incorrect sync behavior.
633
634        Accepts a configured catalog dict and replaces the connection's entire
635        catalog with it. All other connection settings remain unchanged.
636
637        The catalog shape should match the output of `dump_raw_catalog()`:
638        `{"streams": [{"stream": {...}, "config": {...}}, ...]}`.
639
640        Args:
641            catalog: The configured catalog dict to set.
642        """
643        api_util.replace_connection_catalog(
644            connection_id=self.connection_id,
645            configured_catalog_dict=catalog,
646            api_root=self.workspace.api_root,
647            client_id=self.workspace.client_id,
648            client_secret=self.workspace.client_secret,
649            bearer_token=self.workspace.bearer_token,
650        )

Replace the configured catalog for this connection.

⚠️ WARNING: Modifying the catalog directly is not recommended and could result in broken connections, and/or incorrect sync behavior.

Accepts a configured catalog dict and replaces the connection's entire catalog with it. All other connection settings remain unchanged.

The catalog shape should match the output of dump_raw_catalog(): {"streams": [{"stream": {...}, "config": {...}}, ...]}.

Arguments:
  • catalog: The configured catalog dict to set.
def rename(self, name: str) -> CloudConnection:
652    def rename(self, name: str) -> CloudConnection:
653        """Rename the connection.
654
655        Args:
656            name: New name for the connection
657
658        Returns:
659            Updated CloudConnection object with refreshed info
660        """
661        updated_response = api_util.patch_connection(
662            connection_id=self.connection_id,
663            api_root=self.workspace.api_root,
664            client_id=self.workspace.client_id,
665            client_secret=self.workspace.client_secret,
666            bearer_token=self.workspace.bearer_token,
667            name=name,
668        )
669        self._connection_info = updated_response
670        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
672    def set_table_prefix(self, prefix: str) -> CloudConnection:
673        """Set the table prefix for the connection.
674
675        Args:
676            prefix: New table prefix to use when syncing to the destination
677
678        Returns:
679            Updated CloudConnection object with refreshed info
680        """
681        updated_response = api_util.patch_connection(
682            connection_id=self.connection_id,
683            api_root=self.workspace.api_root,
684            client_id=self.workspace.client_id,
685            client_secret=self.workspace.client_secret,
686            bearer_token=self.workspace.bearer_token,
687            prefix=prefix,
688        )
689        self._connection_info = updated_response
690        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
692    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
693        """Set the selected streams for the connection.
694
695        This is a destructive operation that can break existing connections if the
696        stream selection is changed incorrectly. Use with caution.
697
698        Args:
699            stream_names: List of stream names to sync
700
701        Returns:
702            Updated CloudConnection object with refreshed info
703        """
704        configurations = api_util.build_stream_configurations(stream_names)
705
706        updated_response = api_util.patch_connection(
707            connection_id=self.connection_id,
708            api_root=self.workspace.api_root,
709            client_id=self.workspace.client_id,
710            client_secret=self.workspace.client_secret,
711            bearer_token=self.workspace.bearer_token,
712            configurations=configurations,
713        )
714        self._connection_info = updated_response
715        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

enabled: bool
719    @property
720    def enabled(self) -> bool:
721        """Get the current enabled status of the connection.
722
723        This property always fetches fresh data from the API to ensure accuracy,
724        as another process or user may have toggled the setting.
725
726        Returns:
727            True if the connection status is 'active', False otherwise.
728        """
729        connection_info = self._fetch_connection_info(force_refresh=True)
730        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE

Get the current enabled status of the connection.

This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.

Returns:

True if the connection status is 'active', False otherwise.

def set_enabled(self, *, enabled: bool, ignore_noop: bool = True) -> None:
742    def set_enabled(
743        self,
744        *,
745        enabled: bool,
746        ignore_noop: bool = True,
747    ) -> None:
748        """Set the enabled status of the connection.
749
750        Args:
751            enabled: True to enable (set status to 'active'), False to disable
752                (set status to 'inactive').
753            ignore_noop: If True (default), silently return if the connection is already
754                in the requested state. If False, raise ValueError when the requested
755                state matches the current state.
756
757        Raises:
758            ValueError: If ignore_noop is False and the connection is already in the
759                requested state.
760        """
761        # Always fetch fresh data to check current status
762        connection_info = self._fetch_connection_info(force_refresh=True)
763        current_status = connection_info.status
764        desired_status = (
765            api_util.models.ConnectionStatusEnum.ACTIVE
766            if enabled
767            else api_util.models.ConnectionStatusEnum.INACTIVE
768        )
769
770        if current_status == desired_status:
771            if ignore_noop:
772                return
773            raise ValueError(
774                f"Connection is already {'enabled' if enabled else 'disabled'}. "
775                f"Current status: {current_status}"
776            )
777
778        updated_response = api_util.patch_connection(
779            connection_id=self.connection_id,
780            api_root=self.workspace.api_root,
781            client_id=self.workspace.client_id,
782            client_secret=self.workspace.client_secret,
783            bearer_token=self.workspace.bearer_token,
784            status=desired_status,
785        )
786        self._connection_info = updated_response

Set the enabled status of the connection.

Arguments:
  • enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
  • ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
  • ValueError: If ignore_noop is False and the connection is already in the requested state.
def set_schedule(self, cron_expression: str) -> None:
790    def set_schedule(
791        self,
792        cron_expression: str,
793    ) -> None:
794        """Set a cron schedule for the connection.
795
796        Args:
797            cron_expression: A cron expression defining when syncs should run.
798
799        Examples:
800                - "0 0 * * *" - Daily at midnight UTC
801                - "0 */6 * * *" - Every 6 hours
802                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
803        """
804        schedule = api_util.models.AirbyteAPIConnectionSchedule(
805            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
806            cron_expression=cron_expression,
807        )
808        updated_response = api_util.patch_connection(
809            connection_id=self.connection_id,
810            api_root=self.workspace.api_root,
811            client_id=self.workspace.client_id,
812            client_secret=self.workspace.client_secret,
813            bearer_token=self.workspace.bearer_token,
814            schedule=schedule,
815        )
816        self._connection_info = updated_response

Set a cron schedule for the connection.

Arguments:
  • cron_expression: A cron expression defining when syncs should run.
Examples:
  • "0 0 * * *" - Daily at midnight UTC
  • "0 */6 * * *" - Every 6 hours
  • "0 0 * * 0" - Weekly on Sunday at midnight UTC
def set_manual_schedule(self) -> None:
818    def set_manual_schedule(self) -> None:
819        """Set the connection to manual scheduling.
820
821        Disables automatic syncs. Syncs will only run when manually triggered.
822        """
823        schedule = api_util.models.AirbyteAPIConnectionSchedule(
824            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
825        )
826        updated_response = api_util.patch_connection(
827            connection_id=self.connection_id,
828            api_root=self.workspace.api_root,
829            client_id=self.workspace.client_id,
830            client_secret=self.workspace.client_secret,
831            bearer_token=self.workspace.bearer_token,
832            schedule=schedule,
833        )
834        self._connection_info = updated_response

Set the connection to manual scheduling.

Disables automatic syncs. Syncs will only run when manually triggered.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
838    def permanently_delete(
839        self,
840        *,
841        cascade_delete_source: bool = False,
842        cascade_delete_destination: bool = False,
843    ) -> None:
844        """Delete the connection.
845
846        Args:
847            cascade_delete_source: Whether to also delete the source.
848            cascade_delete_destination: Whether to also delete the destination.
849        """
850        self.workspace.permanently_delete_connection(self)
851
852        if cascade_delete_source:
853            self.workspace.permanently_delete_source(self.source_id)
854
855        if cascade_delete_destination:
856            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.