airbyte.cloud.connections

Cloud Connections.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Cloud Connections."""
  3
  4from __future__ import annotations
  5
  6from typing import TYPE_CHECKING, Any
  7
  8from airbyte._util import api_util
  9from airbyte.cloud.connectors import CloudDestination, CloudSource
 10from airbyte.cloud.sync_results import SyncResult
 11from airbyte.exceptions import AirbyteWorkspaceMismatchError
 12
 13
 14if TYPE_CHECKING:
 15    from airbyte_api.models import ConnectionResponse, JobResponse
 16
 17    from airbyte.cloud.workspaces import CloudWorkspace
 18
 19
 20class CloudConnection:  # noqa: PLR0904  # Too many public methods
 21    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 22
 23    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 24    """
 25
 26    def __init__(
 27        self,
 28        workspace: CloudWorkspace,
 29        connection_id: str,
 30        source: str | None = None,
 31        destination: str | None = None,
 32    ) -> None:
 33        """It is not recommended to create a `CloudConnection` object directly.
 34
 35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 36        """
 37        self.connection_id = connection_id
 38        """The ID of the connection."""
 39
 40        self.workspace = workspace
 41        """The workspace that the connection belongs to."""
 42
 43        self._source_id = source
 44        """The ID of the source."""
 45
 46        self._destination_id = destination
 47        """The ID of the destination."""
 48
 49        self._connection_info: ConnectionResponse | None = None
 50        """The connection info object. (Cached.)"""
 51
 52        self._cloud_source_object: CloudSource | None = None
 53        """The source object. (Cached.)"""
 54
 55        self._cloud_destination_object: CloudDestination | None = None
 56        """The destination object. (Cached.)"""
 57
 58    def _fetch_connection_info(
 59        self,
 60        *,
 61        force_refresh: bool = False,
 62        verify: bool = True,
 63    ) -> ConnectionResponse:
 64        """Fetch and cache connection info from the API.
 65
 66        By default, this method will only fetch from the API if connection info is not
 67        already cached. It also verifies that the connection belongs to the expected
 68        workspace unless verification is explicitly disabled.
 69
 70        Args:
 71            force_refresh: If True, always fetch from the API even if cached.
 72                If False (default), only fetch if not already cached.
 73            verify: If True (default), verify that the connection is valid (e.g., that
 74                the workspace_id matches this object's workspace). Raises an error if
 75                validation fails.
 76
 77        Returns:
 78            The ConnectionResponse from the API.
 79
 80        Raises:
 81            AirbyteWorkspaceMismatchError: If verify is True and the connection's
 82                workspace_id doesn't match the expected workspace.
 83            AirbyteMissingResourceError: If the connection doesn't exist.
 84        """
 85        if not force_refresh and self._connection_info is not None:
 86            # Use cached info, but still verify if requested
 87            if verify:
 88                self._verify_workspace_match(self._connection_info)
 89            return self._connection_info
 90
 91        # Fetch from API
 92        connection_info = api_util.get_connection(
 93            workspace_id=self.workspace.workspace_id,
 94            connection_id=self.connection_id,
 95            api_root=self.workspace.api_root,
 96            client_id=self.workspace.client_id,
 97            client_secret=self.workspace.client_secret,
 98            bearer_token=self.workspace.bearer_token,
 99        )
100
101        # Cache the result first (before verification may raise)
102        self._connection_info = connection_info
103
104        # Verify if requested
105        if verify:
106            self._verify_workspace_match(connection_info)
107
108        return connection_info
109
110    def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None:
111        """Verify that the connection belongs to the expected workspace.
112
113        Raises:
114            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
115        """
116        if connection_info.workspace_id != self.workspace.workspace_id:
117            raise AirbyteWorkspaceMismatchError(
118                resource_type="connection",
119                resource_id=self.connection_id,
120                workspace=self.workspace,
121                expected_workspace_id=self.workspace.workspace_id,
122                actual_workspace_id=connection_info.workspace_id,
123                message=(
124                    f"Connection '{self.connection_id}' belongs to workspace "
125                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
126                ),
127            )
128
129    def check_is_valid(self) -> bool:
130        """Check if this connection exists and belongs to the expected workspace.
131
132        This method fetches connection info from the API (if not already cached) and
133        verifies that the connection's workspace_id matches the workspace associated
134        with this CloudConnection object.
135
136        Returns:
137            True if the connection exists and belongs to the expected workspace.
138
139        Raises:
140            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
141            AirbyteMissingResourceError: If the connection doesn't exist.
142        """
143        self._fetch_connection_info(force_refresh=False, verify=True)
144        return True
145
146    @classmethod
147    def _from_connection_response(
148        cls,
149        workspace: CloudWorkspace,
150        connection_response: ConnectionResponse,
151    ) -> CloudConnection:
152        """Create a CloudConnection from a ConnectionResponse."""
153        result = cls(
154            workspace=workspace,
155            connection_id=connection_response.connection_id,
156            source=connection_response.source_id,
157            destination=connection_response.destination_id,
158        )
159        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
160        return result
161
162    # Properties
163
164    @property
165    def name(self) -> str | None:
166        """Get the display name of the connection, if available.
167
168        E.g. "My Postgres to Snowflake", not the connection ID.
169        """
170        if not self._connection_info:
171            self._connection_info = self._fetch_connection_info()
172
173        return self._connection_info.name
174
175    @property
176    def source_id(self) -> str:
177        """The ID of the source."""
178        if not self._source_id:
179            if not self._connection_info:
180                self._connection_info = self._fetch_connection_info()
181
182            self._source_id = self._connection_info.source_id
183
184        return self._source_id
185
186    @property
187    def source(self) -> CloudSource:
188        """Get the source object."""
189        if self._cloud_source_object:
190            return self._cloud_source_object
191
192        self._cloud_source_object = CloudSource(
193            workspace=self.workspace,
194            connector_id=self.source_id,
195        )
196        return self._cloud_source_object
197
198    @property
199    def destination_id(self) -> str:
200        """The ID of the destination."""
201        if not self._destination_id:
202            if not self._connection_info:
203                self._connection_info = self._fetch_connection_info()
204
205            self._destination_id = self._connection_info.destination_id
206
207        return self._destination_id
208
209    @property
210    def destination(self) -> CloudDestination:
211        """Get the destination object."""
212        if self._cloud_destination_object:
213            return self._cloud_destination_object
214
215        self._cloud_destination_object = CloudDestination(
216            workspace=self.workspace,
217            connector_id=self.destination_id,
218        )
219        return self._cloud_destination_object
220
221    @property
222    def stream_names(self) -> list[str]:
223        """The stream names."""
224        if not self._connection_info:
225            self._connection_info = self._fetch_connection_info()
226
227        return [stream.name for stream in self._connection_info.configurations.streams or []]
228
229    @property
230    def table_prefix(self) -> str:
231        """The table prefix."""
232        if not self._connection_info:
233            self._connection_info = self._fetch_connection_info()
234
235        return self._connection_info.prefix or ""
236
237    @property
238    def connection_url(self) -> str | None:
239        """The web URL to the connection."""
240        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
241
242    @property
243    def job_history_url(self) -> str | None:
244        """The URL to the job history for the connection."""
245        return f"{self.connection_url}/timeline"
246
247    # Run Sync
248
249    def run_sync(
250        self,
251        *,
252        wait: bool = True,
253        wait_timeout: int = 300,
254    ) -> SyncResult:
255        """Run a sync."""
256        connection_response = api_util.run_connection(
257            connection_id=self.connection_id,
258            api_root=self.workspace.api_root,
259            workspace_id=self.workspace.workspace_id,
260            client_id=self.workspace.client_id,
261            client_secret=self.workspace.client_secret,
262            bearer_token=self.workspace.bearer_token,
263        )
264        sync_result = SyncResult(
265            workspace=self.workspace,
266            connection=self,
267            job_id=connection_response.job_id,
268        )
269
270        if wait:
271            sync_result.wait_for_completion(
272                wait_timeout=wait_timeout,
273                raise_failure=True,
274                raise_timeout=True,
275            )
276
277        return sync_result
278
279    def __repr__(self) -> str:
280        """String representation of the connection."""
281        return (
282            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
283            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
284        )
285
286    # Logs
287
288    def get_previous_sync_logs(
289        self,
290        *,
291        limit: int = 20,
292        offset: int | None = None,
293        from_tail: bool = True,
294    ) -> list[SyncResult]:
295        """Get previous sync jobs for a connection with pagination support.
296
297        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
298        rows_synced, start_time). Full log text can be fetched lazily via
299        `SyncResult.get_full_log_text()`.
300
301        Args:
302            limit: Maximum number of jobs to return. Defaults to 20.
303            offset: Number of jobs to skip from the beginning. Defaults to None (0).
304            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
305                If False, returns jobs ordered oldest-first (createdAt ASC).
306                Defaults to True.
307
308        Returns:
309            A list of SyncResult objects representing the sync jobs.
310        """
311        order_by = (
312            api_util.JOB_ORDER_BY_CREATED_AT_DESC
313            if from_tail
314            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
315        )
316        sync_logs: list[JobResponse] = api_util.get_job_logs(
317            connection_id=self.connection_id,
318            api_root=self.workspace.api_root,
319            workspace_id=self.workspace.workspace_id,
320            limit=limit,
321            offset=offset,
322            order_by=order_by,
323            client_id=self.workspace.client_id,
324            client_secret=self.workspace.client_secret,
325            bearer_token=self.workspace.bearer_token,
326        )
327        return [
328            SyncResult(
329                workspace=self.workspace,
330                connection=self,
331                job_id=sync_log.job_id,
332                _latest_job_info=sync_log,
333            )
334            for sync_log in sync_logs
335        ]
336
337    def get_sync_result(
338        self,
339        job_id: int | None = None,
340    ) -> SyncResult | None:
341        """Get the sync result for the connection.
342
343        If `job_id` is not provided, the most recent sync job will be used.
344
345        Returns `None` if job_id is omitted and no previous jobs are found.
346        """
347        if job_id is None:
348            # Get the most recent sync job
349            results = self.get_previous_sync_logs(
350                limit=1,
351            )
352            if results:
353                return results[0]
354
355            return None
356
357        # Get the sync job by ID (lazy loaded)
358        return SyncResult(
359            workspace=self.workspace,
360            connection=self,
361            job_id=job_id,
362        )
363
364    # Artifacts
365
366    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
367        """Get the connection state artifacts.
368
369        Returns the persisted state for this connection, which can be used
370        when debugging incremental syncs.
371
372        Uses the Config API endpoint: POST /v1/state/get
373
374        Returns:
375            List of state objects for each stream, or None if no state is set.
376        """
377        state_response = api_util.get_connection_state(
378            connection_id=self.connection_id,
379            api_root=self.workspace.api_root,
380            client_id=self.workspace.client_id,
381            client_secret=self.workspace.client_secret,
382            bearer_token=self.workspace.bearer_token,
383        )
384        if state_response.get("stateType") == "not_set":
385            return None
386        return state_response.get("streamState", [])
387
388    def get_catalog_artifact(self) -> dict[str, Any] | None:
389        """Get the configured catalog for this connection.
390
391        Returns the full configured catalog (syncCatalog) for this connection,
392        including stream schemas, sync modes, cursor fields, and primary keys.
393
394        Uses the Config API endpoint: POST /v1/web_backend/connections/get
395
396        Returns:
397            Dictionary containing the configured catalog, or `None` if not found.
398        """
399        connection_response = api_util.get_connection_catalog(
400            connection_id=self.connection_id,
401            api_root=self.workspace.api_root,
402            client_id=self.workspace.client_id,
403            client_secret=self.workspace.client_secret,
404            bearer_token=self.workspace.bearer_token,
405        )
406        return connection_response.get("syncCatalog")
407
408    def rename(self, name: str) -> CloudConnection:
409        """Rename the connection.
410
411        Args:
412            name: New name for the connection
413
414        Returns:
415            Updated CloudConnection object with refreshed info
416        """
417        updated_response = api_util.patch_connection(
418            connection_id=self.connection_id,
419            api_root=self.workspace.api_root,
420            client_id=self.workspace.client_id,
421            client_secret=self.workspace.client_secret,
422            bearer_token=self.workspace.bearer_token,
423            name=name,
424        )
425        self._connection_info = updated_response
426        return self
427
428    def set_table_prefix(self, prefix: str) -> CloudConnection:
429        """Set the table prefix for the connection.
430
431        Args:
432            prefix: New table prefix to use when syncing to the destination
433
434        Returns:
435            Updated CloudConnection object with refreshed info
436        """
437        updated_response = api_util.patch_connection(
438            connection_id=self.connection_id,
439            api_root=self.workspace.api_root,
440            client_id=self.workspace.client_id,
441            client_secret=self.workspace.client_secret,
442            bearer_token=self.workspace.bearer_token,
443            prefix=prefix,
444        )
445        self._connection_info = updated_response
446        return self
447
448    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
449        """Set the selected streams for the connection.
450
451        This is a destructive operation that can break existing connections if the
452        stream selection is changed incorrectly. Use with caution.
453
454        Args:
455            stream_names: List of stream names to sync
456
457        Returns:
458            Updated CloudConnection object with refreshed info
459        """
460        configurations = api_util.build_stream_configurations(stream_names)
461
462        updated_response = api_util.patch_connection(
463            connection_id=self.connection_id,
464            api_root=self.workspace.api_root,
465            client_id=self.workspace.client_id,
466            client_secret=self.workspace.client_secret,
467            bearer_token=self.workspace.bearer_token,
468            configurations=configurations,
469        )
470        self._connection_info = updated_response
471        return self
472
473    # Enable/Disable
474
475    @property
476    def enabled(self) -> bool:
477        """Get the current enabled status of the connection.
478
479        This property always fetches fresh data from the API to ensure accuracy,
480        as another process or user may have toggled the setting.
481
482        Returns:
483            True if the connection status is 'active', False otherwise.
484        """
485        connection_info = self._fetch_connection_info(force_refresh=True)
486        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
487
488    @enabled.setter
489    def enabled(self, value: bool) -> None:
490        """Set the enabled status of the connection.
491
492        Args:
493            value: True to enable (set status to 'active'), False to disable
494                (set status to 'inactive').
495        """
496        self.set_enabled(enabled=value)
497
498    def set_enabled(
499        self,
500        *,
501        enabled: bool,
502        ignore_noop: bool = True,
503    ) -> None:
504        """Set the enabled status of the connection.
505
506        Args:
507            enabled: True to enable (set status to 'active'), False to disable
508                (set status to 'inactive').
509            ignore_noop: If True (default), silently return if the connection is already
510                in the requested state. If False, raise ValueError when the requested
511                state matches the current state.
512
513        Raises:
514            ValueError: If ignore_noop is False and the connection is already in the
515                requested state.
516        """
517        # Always fetch fresh data to check current status
518        connection_info = self._fetch_connection_info(force_refresh=True)
519        current_status = connection_info.status
520        desired_status = (
521            api_util.models.ConnectionStatusEnum.ACTIVE
522            if enabled
523            else api_util.models.ConnectionStatusEnum.INACTIVE
524        )
525
526        if current_status == desired_status:
527            if ignore_noop:
528                return
529            raise ValueError(
530                f"Connection is already {'enabled' if enabled else 'disabled'}. "
531                f"Current status: {current_status}"
532            )
533
534        updated_response = api_util.patch_connection(
535            connection_id=self.connection_id,
536            api_root=self.workspace.api_root,
537            client_id=self.workspace.client_id,
538            client_secret=self.workspace.client_secret,
539            bearer_token=self.workspace.bearer_token,
540            status=desired_status,
541        )
542        self._connection_info = updated_response
543
544    # Scheduling
545
546    def set_schedule(
547        self,
548        cron_expression: str,
549    ) -> None:
550        """Set a cron schedule for the connection.
551
552        Args:
553            cron_expression: A cron expression defining when syncs should run.
554
555        Examples:
556                - "0 0 * * *" - Daily at midnight UTC
557                - "0 */6 * * *" - Every 6 hours
558                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
559        """
560        schedule = api_util.models.AirbyteAPIConnectionSchedule(
561            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
562            cron_expression=cron_expression,
563        )
564        updated_response = api_util.patch_connection(
565            connection_id=self.connection_id,
566            api_root=self.workspace.api_root,
567            client_id=self.workspace.client_id,
568            client_secret=self.workspace.client_secret,
569            bearer_token=self.workspace.bearer_token,
570            schedule=schedule,
571        )
572        self._connection_info = updated_response
573
574    def set_manual_schedule(self) -> None:
575        """Set the connection to manual scheduling.
576
577        Disables automatic syncs. Syncs will only run when manually triggered.
578        """
579        schedule = api_util.models.AirbyteAPIConnectionSchedule(
580            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
581        )
582        updated_response = api_util.patch_connection(
583            connection_id=self.connection_id,
584            api_root=self.workspace.api_root,
585            client_id=self.workspace.client_id,
586            client_secret=self.workspace.client_secret,
587            bearer_token=self.workspace.bearer_token,
588            schedule=schedule,
589        )
590        self._connection_info = updated_response
591
592    # Deletions
593
594    def permanently_delete(
595        self,
596        *,
597        cascade_delete_source: bool = False,
598        cascade_delete_destination: bool = False,
599    ) -> None:
600        """Delete the connection.
601
602        Args:
603            cascade_delete_source: Whether to also delete the source.
604            cascade_delete_destination: Whether to also delete the destination.
605        """
606        self.workspace.permanently_delete_connection(self)
607
608        if cascade_delete_source:
609            self.workspace.permanently_delete_source(self.source_id)
610
611        if cascade_delete_destination:
612            self.workspace.permanently_delete_destination(self.destination_id)
class CloudConnection:
 21class CloudConnection:  # noqa: PLR0904  # Too many public methods
 22    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 23
 24    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 25    """
 26
 27    def __init__(
 28        self,
 29        workspace: CloudWorkspace,
 30        connection_id: str,
 31        source: str | None = None,
 32        destination: str | None = None,
 33    ) -> None:
 34        """It is not recommended to create a `CloudConnection` object directly.
 35
 36        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 37        """
 38        self.connection_id = connection_id
 39        """The ID of the connection."""
 40
 41        self.workspace = workspace
 42        """The workspace that the connection belongs to."""
 43
 44        self._source_id = source
 45        """The ID of the source."""
 46
 47        self._destination_id = destination
 48        """The ID of the destination."""
 49
 50        self._connection_info: ConnectionResponse | None = None
 51        """The connection info object. (Cached.)"""
 52
 53        self._cloud_source_object: CloudSource | None = None
 54        """The source object. (Cached.)"""
 55
 56        self._cloud_destination_object: CloudDestination | None = None
 57        """The destination object. (Cached.)"""
 58
 59    def _fetch_connection_info(
 60        self,
 61        *,
 62        force_refresh: bool = False,
 63        verify: bool = True,
 64    ) -> ConnectionResponse:
 65        """Fetch and cache connection info from the API.
 66
 67        By default, this method will only fetch from the API if connection info is not
 68        already cached. It also verifies that the connection belongs to the expected
 69        workspace unless verification is explicitly disabled.
 70
 71        Args:
 72            force_refresh: If True, always fetch from the API even if cached.
 73                If False (default), only fetch if not already cached.
 74            verify: If True (default), verify that the connection is valid (e.g., that
 75                the workspace_id matches this object's workspace). Raises an error if
 76                validation fails.
 77
 78        Returns:
 79            The ConnectionResponse from the API.
 80
 81        Raises:
 82            AirbyteWorkspaceMismatchError: If verify is True and the connection's
 83                workspace_id doesn't match the expected workspace.
 84            AirbyteMissingResourceError: If the connection doesn't exist.
 85        """
 86        if not force_refresh and self._connection_info is not None:
 87            # Use cached info, but still verify if requested
 88            if verify:
 89                self._verify_workspace_match(self._connection_info)
 90            return self._connection_info
 91
 92        # Fetch from API
 93        connection_info = api_util.get_connection(
 94            workspace_id=self.workspace.workspace_id,
 95            connection_id=self.connection_id,
 96            api_root=self.workspace.api_root,
 97            client_id=self.workspace.client_id,
 98            client_secret=self.workspace.client_secret,
 99            bearer_token=self.workspace.bearer_token,
100        )
101
102        # Cache the result first (before verification may raise)
103        self._connection_info = connection_info
104
105        # Verify if requested
106        if verify:
107            self._verify_workspace_match(connection_info)
108
109        return connection_info
110
111    def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None:
112        """Verify that the connection belongs to the expected workspace.
113
114        Raises:
115            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
116        """
117        if connection_info.workspace_id != self.workspace.workspace_id:
118            raise AirbyteWorkspaceMismatchError(
119                resource_type="connection",
120                resource_id=self.connection_id,
121                workspace=self.workspace,
122                expected_workspace_id=self.workspace.workspace_id,
123                actual_workspace_id=connection_info.workspace_id,
124                message=(
125                    f"Connection '{self.connection_id}' belongs to workspace "
126                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
127                ),
128            )
129
130    def check_is_valid(self) -> bool:
131        """Check if this connection exists and belongs to the expected workspace.
132
133        This method fetches connection info from the API (if not already cached) and
134        verifies that the connection's workspace_id matches the workspace associated
135        with this CloudConnection object.
136
137        Returns:
138            True if the connection exists and belongs to the expected workspace.
139
140        Raises:
141            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
142            AirbyteMissingResourceError: If the connection doesn't exist.
143        """
144        self._fetch_connection_info(force_refresh=False, verify=True)
145        return True
146
147    @classmethod
148    def _from_connection_response(
149        cls,
150        workspace: CloudWorkspace,
151        connection_response: ConnectionResponse,
152    ) -> CloudConnection:
153        """Create a CloudConnection from a ConnectionResponse."""
154        result = cls(
155            workspace=workspace,
156            connection_id=connection_response.connection_id,
157            source=connection_response.source_id,
158            destination=connection_response.destination_id,
159        )
160        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
161        return result
162
163    # Properties
164
165    @property
166    def name(self) -> str | None:
167        """Get the display name of the connection, if available.
168
169        E.g. "My Postgres to Snowflake", not the connection ID.
170        """
171        if not self._connection_info:
172            self._connection_info = self._fetch_connection_info()
173
174        return self._connection_info.name
175
176    @property
177    def source_id(self) -> str:
178        """The ID of the source."""
179        if not self._source_id:
180            if not self._connection_info:
181                self._connection_info = self._fetch_connection_info()
182
183            self._source_id = self._connection_info.source_id
184
185        return self._source_id
186
187    @property
188    def source(self) -> CloudSource:
189        """Get the source object."""
190        if self._cloud_source_object:
191            return self._cloud_source_object
192
193        self._cloud_source_object = CloudSource(
194            workspace=self.workspace,
195            connector_id=self.source_id,
196        )
197        return self._cloud_source_object
198
199    @property
200    def destination_id(self) -> str:
201        """The ID of the destination."""
202        if not self._destination_id:
203            if not self._connection_info:
204                self._connection_info = self._fetch_connection_info()
205
206            self._destination_id = self._connection_info.destination_id
207
208        return self._destination_id
209
210    @property
211    def destination(self) -> CloudDestination:
212        """Get the destination object."""
213        if self._cloud_destination_object:
214            return self._cloud_destination_object
215
216        self._cloud_destination_object = CloudDestination(
217            workspace=self.workspace,
218            connector_id=self.destination_id,
219        )
220        return self._cloud_destination_object
221
222    @property
223    def stream_names(self) -> list[str]:
224        """The stream names."""
225        if not self._connection_info:
226            self._connection_info = self._fetch_connection_info()
227
228        return [stream.name for stream in self._connection_info.configurations.streams or []]
229
230    @property
231    def table_prefix(self) -> str:
232        """The table prefix."""
233        if not self._connection_info:
234            self._connection_info = self._fetch_connection_info()
235
236        return self._connection_info.prefix or ""
237
238    @property
239    def connection_url(self) -> str | None:
240        """The web URL to the connection."""
241        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
242
243    @property
244    def job_history_url(self) -> str | None:
245        """The URL to the job history for the connection."""
246        return f"{self.connection_url}/timeline"
247
248    # Run Sync
249
250    def run_sync(
251        self,
252        *,
253        wait: bool = True,
254        wait_timeout: int = 300,
255    ) -> SyncResult:
256        """Run a sync."""
257        connection_response = api_util.run_connection(
258            connection_id=self.connection_id,
259            api_root=self.workspace.api_root,
260            workspace_id=self.workspace.workspace_id,
261            client_id=self.workspace.client_id,
262            client_secret=self.workspace.client_secret,
263            bearer_token=self.workspace.bearer_token,
264        )
265        sync_result = SyncResult(
266            workspace=self.workspace,
267            connection=self,
268            job_id=connection_response.job_id,
269        )
270
271        if wait:
272            sync_result.wait_for_completion(
273                wait_timeout=wait_timeout,
274                raise_failure=True,
275                raise_timeout=True,
276            )
277
278        return sync_result
279
280    def __repr__(self) -> str:
281        """String representation of the connection."""
282        return (
283            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
284            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
285        )
286
287    # Logs
288
289    def get_previous_sync_logs(
290        self,
291        *,
292        limit: int = 20,
293        offset: int | None = None,
294        from_tail: bool = True,
295    ) -> list[SyncResult]:
296        """Get previous sync jobs for a connection with pagination support.
297
298        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
299        rows_synced, start_time). Full log text can be fetched lazily via
300        `SyncResult.get_full_log_text()`.
301
302        Args:
303            limit: Maximum number of jobs to return. Defaults to 20.
304            offset: Number of jobs to skip from the beginning. Defaults to None (0).
305            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
306                If False, returns jobs ordered oldest-first (createdAt ASC).
307                Defaults to True.
308
309        Returns:
310            A list of SyncResult objects representing the sync jobs.
311        """
312        order_by = (
313            api_util.JOB_ORDER_BY_CREATED_AT_DESC
314            if from_tail
315            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
316        )
317        sync_logs: list[JobResponse] = api_util.get_job_logs(
318            connection_id=self.connection_id,
319            api_root=self.workspace.api_root,
320            workspace_id=self.workspace.workspace_id,
321            limit=limit,
322            offset=offset,
323            order_by=order_by,
324            client_id=self.workspace.client_id,
325            client_secret=self.workspace.client_secret,
326            bearer_token=self.workspace.bearer_token,
327        )
328        return [
329            SyncResult(
330                workspace=self.workspace,
331                connection=self,
332                job_id=sync_log.job_id,
333                _latest_job_info=sync_log,
334            )
335            for sync_log in sync_logs
336        ]
337
338    def get_sync_result(
339        self,
340        job_id: int | None = None,
341    ) -> SyncResult | None:
342        """Get the sync result for the connection.
343
344        If `job_id` is not provided, the most recent sync job will be used.
345
346        Returns `None` if job_id is omitted and no previous jobs are found.
347        """
348        if job_id is None:
349            # Get the most recent sync job
350            results = self.get_previous_sync_logs(
351                limit=1,
352            )
353            if results:
354                return results[0]
355
356            return None
357
358        # Get the sync job by ID (lazy loaded)
359        return SyncResult(
360            workspace=self.workspace,
361            connection=self,
362            job_id=job_id,
363        )
364
365    # Artifacts
366
367    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
368        """Get the connection state artifacts.
369
370        Returns the persisted state for this connection, which can be used
371        when debugging incremental syncs.
372
373        Uses the Config API endpoint: POST /v1/state/get
374
375        Returns:
376            List of state objects for each stream, or None if no state is set.
377        """
378        state_response = api_util.get_connection_state(
379            connection_id=self.connection_id,
380            api_root=self.workspace.api_root,
381            client_id=self.workspace.client_id,
382            client_secret=self.workspace.client_secret,
383            bearer_token=self.workspace.bearer_token,
384        )
385        if state_response.get("stateType") == "not_set":
386            return None
387        return state_response.get("streamState", [])
388
389    def get_catalog_artifact(self) -> dict[str, Any] | None:
390        """Get the configured catalog for this connection.
391
392        Returns the full configured catalog (syncCatalog) for this connection,
393        including stream schemas, sync modes, cursor fields, and primary keys.
394
395        Uses the Config API endpoint: POST /v1/web_backend/connections/get
396
397        Returns:
398            Dictionary containing the configured catalog, or `None` if not found.
399        """
400        connection_response = api_util.get_connection_catalog(
401            connection_id=self.connection_id,
402            api_root=self.workspace.api_root,
403            client_id=self.workspace.client_id,
404            client_secret=self.workspace.client_secret,
405            bearer_token=self.workspace.bearer_token,
406        )
407        return connection_response.get("syncCatalog")
408
409    def rename(self, name: str) -> CloudConnection:
410        """Rename the connection.
411
412        Args:
413            name: New name for the connection
414
415        Returns:
416            Updated CloudConnection object with refreshed info
417        """
418        updated_response = api_util.patch_connection(
419            connection_id=self.connection_id,
420            api_root=self.workspace.api_root,
421            client_id=self.workspace.client_id,
422            client_secret=self.workspace.client_secret,
423            bearer_token=self.workspace.bearer_token,
424            name=name,
425        )
426        self._connection_info = updated_response
427        return self
428
429    def set_table_prefix(self, prefix: str) -> CloudConnection:
430        """Set the table prefix for the connection.
431
432        Args:
433            prefix: New table prefix to use when syncing to the destination
434
435        Returns:
436            Updated CloudConnection object with refreshed info
437        """
438        updated_response = api_util.patch_connection(
439            connection_id=self.connection_id,
440            api_root=self.workspace.api_root,
441            client_id=self.workspace.client_id,
442            client_secret=self.workspace.client_secret,
443            bearer_token=self.workspace.bearer_token,
444            prefix=prefix,
445        )
446        self._connection_info = updated_response
447        return self
448
449    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
450        """Set the selected streams for the connection.
451
452        This is a destructive operation that can break existing connections if the
453        stream selection is changed incorrectly. Use with caution.
454
455        Args:
456            stream_names: List of stream names to sync
457
458        Returns:
459            Updated CloudConnection object with refreshed info
460        """
461        configurations = api_util.build_stream_configurations(stream_names)
462
463        updated_response = api_util.patch_connection(
464            connection_id=self.connection_id,
465            api_root=self.workspace.api_root,
466            client_id=self.workspace.client_id,
467            client_secret=self.workspace.client_secret,
468            bearer_token=self.workspace.bearer_token,
469            configurations=configurations,
470        )
471        self._connection_info = updated_response
472        return self
473
474    # Enable/Disable
475
476    @property
477    def enabled(self) -> bool:
478        """Get the current enabled status of the connection.
479
480        This property always fetches fresh data from the API to ensure accuracy,
481        as another process or user may have toggled the setting.
482
483        Returns:
484            True if the connection status is 'active', False otherwise.
485        """
486        connection_info = self._fetch_connection_info(force_refresh=True)
487        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
488
489    @enabled.setter
490    def enabled(self, value: bool) -> None:
491        """Set the enabled status of the connection.
492
493        Args:
494            value: True to enable (set status to 'active'), False to disable
495                (set status to 'inactive').
496        """
497        self.set_enabled(enabled=value)
498
499    def set_enabled(
500        self,
501        *,
502        enabled: bool,
503        ignore_noop: bool = True,
504    ) -> None:
505        """Set the enabled status of the connection.
506
507        Args:
508            enabled: True to enable (set status to 'active'), False to disable
509                (set status to 'inactive').
510            ignore_noop: If True (default), silently return if the connection is already
511                in the requested state. If False, raise ValueError when the requested
512                state matches the current state.
513
514        Raises:
515            ValueError: If ignore_noop is False and the connection is already in the
516                requested state.
517        """
518        # Always fetch fresh data to check current status
519        connection_info = self._fetch_connection_info(force_refresh=True)
520        current_status = connection_info.status
521        desired_status = (
522            api_util.models.ConnectionStatusEnum.ACTIVE
523            if enabled
524            else api_util.models.ConnectionStatusEnum.INACTIVE
525        )
526
527        if current_status == desired_status:
528            if ignore_noop:
529                return
530            raise ValueError(
531                f"Connection is already {'enabled' if enabled else 'disabled'}. "
532                f"Current status: {current_status}"
533            )
534
535        updated_response = api_util.patch_connection(
536            connection_id=self.connection_id,
537            api_root=self.workspace.api_root,
538            client_id=self.workspace.client_id,
539            client_secret=self.workspace.client_secret,
540            bearer_token=self.workspace.bearer_token,
541            status=desired_status,
542        )
543        self._connection_info = updated_response
544
545    # Scheduling
546
547    def set_schedule(
548        self,
549        cron_expression: str,
550    ) -> None:
551        """Set a cron schedule for the connection.
552
553        Args:
554            cron_expression: A cron expression defining when syncs should run.
555
556        Examples:
557                - "0 0 * * *" - Daily at midnight UTC
558                - "0 */6 * * *" - Every 6 hours
559                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
560        """
561        schedule = api_util.models.AirbyteAPIConnectionSchedule(
562            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
563            cron_expression=cron_expression,
564        )
565        updated_response = api_util.patch_connection(
566            connection_id=self.connection_id,
567            api_root=self.workspace.api_root,
568            client_id=self.workspace.client_id,
569            client_secret=self.workspace.client_secret,
570            bearer_token=self.workspace.bearer_token,
571            schedule=schedule,
572        )
573        self._connection_info = updated_response
574
575    def set_manual_schedule(self) -> None:
576        """Set the connection to manual scheduling.
577
578        Disables automatic syncs. Syncs will only run when manually triggered.
579        """
580        schedule = api_util.models.AirbyteAPIConnectionSchedule(
581            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
582        )
583        updated_response = api_util.patch_connection(
584            connection_id=self.connection_id,
585            api_root=self.workspace.api_root,
586            client_id=self.workspace.client_id,
587            client_secret=self.workspace.client_secret,
588            bearer_token=self.workspace.bearer_token,
589            schedule=schedule,
590        )
591        self._connection_info = updated_response
592
593    # Deletions
594
595    def permanently_delete(
596        self,
597        *,
598        cascade_delete_source: bool = False,
599        cascade_delete_destination: bool = False,
600    ) -> None:
601        """Delete the connection.
602
603        Args:
604            cascade_delete_source: Whether to also delete the source.
605            cascade_delete_destination: Whether to also delete the destination.
606        """
607        self.workspace.permanently_delete_connection(self)
608
609        if cascade_delete_source:
610            self.workspace.permanently_delete_source(self.source_id)
611
612        if cascade_delete_destination:
613            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: airbyte.cloud.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
27    def __init__(
28        self,
29        workspace: CloudWorkspace,
30        connection_id: str,
31        source: str | None = None,
32        destination: str | None = None,
33    ) -> None:
34        """It is not recommended to create a `CloudConnection` object directly.
35
36        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
37        """
38        self.connection_id = connection_id
39        """The ID of the connection."""
40
41        self.workspace = workspace
42        """The workspace that the connection belongs to."""
43
44        self._source_id = source
45        """The ID of the source."""
46
47        self._destination_id = destination
48        """The ID of the destination."""
49
50        self._connection_info: ConnectionResponse | None = None
51        """The connection info object. (Cached.)"""
52
53        self._cloud_source_object: CloudSource | None = None
54        """The source object. (Cached.)"""
55
56        self._cloud_destination_object: CloudDestination | None = None
57        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

def check_is_valid(self) -> bool:
130    def check_is_valid(self) -> bool:
131        """Check if this connection exists and belongs to the expected workspace.
132
133        This method fetches connection info from the API (if not already cached) and
134        verifies that the connection's workspace_id matches the workspace associated
135        with this CloudConnection object.
136
137        Returns:
138            True if the connection exists and belongs to the expected workspace.
139
140        Raises:
141            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
142            AirbyteMissingResourceError: If the connection doesn't exist.
143        """
144        self._fetch_connection_info(force_refresh=False, verify=True)
145        return True

Check if this connection exists and belongs to the expected workspace.

This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.

Returns:

True if the connection exists and belongs to the expected workspace.

Raises:
  • AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
  • AirbyteMissingResourceError: If the connection doesn't exist.
name: str | None
165    @property
166    def name(self) -> str | None:
167        """Get the display name of the connection, if available.
168
169        E.g. "My Postgres to Snowflake", not the connection ID.
170        """
171        if not self._connection_info:
172            self._connection_info = self._fetch_connection_info()
173
174        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
176    @property
177    def source_id(self) -> str:
178        """The ID of the source."""
179        if not self._source_id:
180            if not self._connection_info:
181                self._connection_info = self._fetch_connection_info()
182
183            self._source_id = self._connection_info.source_id
184
185        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
187    @property
188    def source(self) -> CloudSource:
189        """Get the source object."""
190        if self._cloud_source_object:
191            return self._cloud_source_object
192
193        self._cloud_source_object = CloudSource(
194            workspace=self.workspace,
195            connector_id=self.source_id,
196        )
197        return self._cloud_source_object

Get the source object.

destination_id: str
199    @property
200    def destination_id(self) -> str:
201        """The ID of the destination."""
202        if not self._destination_id:
203            if not self._connection_info:
204                self._connection_info = self._fetch_connection_info()
205
206            self._destination_id = self._connection_info.destination_id
207
208        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
210    @property
211    def destination(self) -> CloudDestination:
212        """Get the destination object."""
213        if self._cloud_destination_object:
214            return self._cloud_destination_object
215
216        self._cloud_destination_object = CloudDestination(
217            workspace=self.workspace,
218            connector_id=self.destination_id,
219        )
220        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
222    @property
223    def stream_names(self) -> list[str]:
224        """The stream names."""
225        if not self._connection_info:
226            self._connection_info = self._fetch_connection_info()
227
228        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
230    @property
231    def table_prefix(self) -> str:
232        """The table prefix."""
233        if not self._connection_info:
234            self._connection_info = self._fetch_connection_info()
235
236        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
238    @property
239    def connection_url(self) -> str | None:
240        """The web URL to the connection."""
241        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
243    @property
244    def job_history_url(self) -> str | None:
245        """The URL to the job history for the connection."""
246        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.SyncResult:
250    def run_sync(
251        self,
252        *,
253        wait: bool = True,
254        wait_timeout: int = 300,
255    ) -> SyncResult:
256        """Run a sync."""
257        connection_response = api_util.run_connection(
258            connection_id=self.connection_id,
259            api_root=self.workspace.api_root,
260            workspace_id=self.workspace.workspace_id,
261            client_id=self.workspace.client_id,
262            client_secret=self.workspace.client_secret,
263            bearer_token=self.workspace.bearer_token,
264        )
265        sync_result = SyncResult(
266            workspace=self.workspace,
267            connection=self,
268            job_id=connection_response.job_id,
269        )
270
271        if wait:
272            sync_result.wait_for_completion(
273                wait_timeout=wait_timeout,
274                raise_failure=True,
275                raise_timeout=True,
276            )
277
278        return sync_result

Run a sync.

def get_previous_sync_logs( self, *, limit: int = 20, offset: int | None = None, from_tail: bool = True) -> list[airbyte.cloud.SyncResult]:
289    def get_previous_sync_logs(
290        self,
291        *,
292        limit: int = 20,
293        offset: int | None = None,
294        from_tail: bool = True,
295    ) -> list[SyncResult]:
296        """Get previous sync jobs for a connection with pagination support.
297
298        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
299        rows_synced, start_time). Full log text can be fetched lazily via
300        `SyncResult.get_full_log_text()`.
301
302        Args:
303            limit: Maximum number of jobs to return. Defaults to 20.
304            offset: Number of jobs to skip from the beginning. Defaults to None (0).
305            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
306                If False, returns jobs ordered oldest-first (createdAt ASC).
307                Defaults to True.
308
309        Returns:
310            A list of SyncResult objects representing the sync jobs.
311        """
312        order_by = (
313            api_util.JOB_ORDER_BY_CREATED_AT_DESC
314            if from_tail
315            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
316        )
317        sync_logs: list[JobResponse] = api_util.get_job_logs(
318            connection_id=self.connection_id,
319            api_root=self.workspace.api_root,
320            workspace_id=self.workspace.workspace_id,
321            limit=limit,
322            offset=offset,
323            order_by=order_by,
324            client_id=self.workspace.client_id,
325            client_secret=self.workspace.client_secret,
326            bearer_token=self.workspace.bearer_token,
327        )
328        return [
329            SyncResult(
330                workspace=self.workspace,
331                connection=self,
332                job_id=sync_log.job_id,
333                _latest_job_info=sync_log,
334            )
335            for sync_log in sync_logs
336        ]

Get previous sync jobs for a connection with pagination support.

Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, rows_synced, start_time). Full log text can be fetched lazily via SyncResult.get_full_log_text().

Arguments:
  • limit: Maximum number of jobs to return. Defaults to 20.
  • offset: Number of jobs to skip from the beginning. Defaults to None (0).
  • from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
Returns:

A list of SyncResult objects representing the sync jobs.

def get_sync_result( self, job_id: int | None = None) -> airbyte.cloud.SyncResult | None:
338    def get_sync_result(
339        self,
340        job_id: int | None = None,
341    ) -> SyncResult | None:
342        """Get the sync result for the connection.
343
344        If `job_id` is not provided, the most recent sync job will be used.
345
346        Returns `None` if job_id is omitted and no previous jobs are found.
347        """
348        if job_id is None:
349            # Get the most recent sync job
350            results = self.get_previous_sync_logs(
351                limit=1,
352            )
353            if results:
354                return results[0]
355
356            return None
357
358        # Get the sync job by ID (lazy loaded)
359        return SyncResult(
360            workspace=self.workspace,
361            connection=self,
362            job_id=job_id,
363        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def get_state_artifacts(self) -> list[dict[str, typing.Any]] | None:
367    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
368        """Get the connection state artifacts.
369
370        Returns the persisted state for this connection, which can be used
371        when debugging incremental syncs.
372
373        Uses the Config API endpoint: POST /v1/state/get
374
375        Returns:
376            List of state objects for each stream, or None if no state is set.
377        """
378        state_response = api_util.get_connection_state(
379            connection_id=self.connection_id,
380            api_root=self.workspace.api_root,
381            client_id=self.workspace.client_id,
382            client_secret=self.workspace.client_secret,
383            bearer_token=self.workspace.bearer_token,
384        )
385        if state_response.get("stateType") == "not_set":
386            return None
387        return state_response.get("streamState", [])

Get the connection state artifacts.

Returns the persisted state for this connection, which can be used when debugging incremental syncs.

Uses the Config API endpoint: POST /v1/state/get

Returns:

List of state objects for each stream, or None if no state is set.

def get_catalog_artifact(self) -> dict[str, typing.Any] | None:
389    def get_catalog_artifact(self) -> dict[str, Any] | None:
390        """Get the configured catalog for this connection.
391
392        Returns the full configured catalog (syncCatalog) for this connection,
393        including stream schemas, sync modes, cursor fields, and primary keys.
394
395        Uses the Config API endpoint: POST /v1/web_backend/connections/get
396
397        Returns:
398            Dictionary containing the configured catalog, or `None` if not found.
399        """
400        connection_response = api_util.get_connection_catalog(
401            connection_id=self.connection_id,
402            api_root=self.workspace.api_root,
403            client_id=self.workspace.client_id,
404            client_secret=self.workspace.client_secret,
405            bearer_token=self.workspace.bearer_token,
406        )
407        return connection_response.get("syncCatalog")

Get the configured catalog for this connection.

Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.

Uses the Config API endpoint: POST /v1/web_backend/connections/get

Returns:

Dictionary containing the configured catalog, or None if not found.

def rename(self, name: str) -> CloudConnection:
409    def rename(self, name: str) -> CloudConnection:
410        """Rename the connection.
411
412        Args:
413            name: New name for the connection
414
415        Returns:
416            Updated CloudConnection object with refreshed info
417        """
418        updated_response = api_util.patch_connection(
419            connection_id=self.connection_id,
420            api_root=self.workspace.api_root,
421            client_id=self.workspace.client_id,
422            client_secret=self.workspace.client_secret,
423            bearer_token=self.workspace.bearer_token,
424            name=name,
425        )
426        self._connection_info = updated_response
427        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
429    def set_table_prefix(self, prefix: str) -> CloudConnection:
430        """Set the table prefix for the connection.
431
432        Args:
433            prefix: New table prefix to use when syncing to the destination
434
435        Returns:
436            Updated CloudConnection object with refreshed info
437        """
438        updated_response = api_util.patch_connection(
439            connection_id=self.connection_id,
440            api_root=self.workspace.api_root,
441            client_id=self.workspace.client_id,
442            client_secret=self.workspace.client_secret,
443            bearer_token=self.workspace.bearer_token,
444            prefix=prefix,
445        )
446        self._connection_info = updated_response
447        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
449    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
450        """Set the selected streams for the connection.
451
452        This is a destructive operation that can break existing connections if the
453        stream selection is changed incorrectly. Use with caution.
454
455        Args:
456            stream_names: List of stream names to sync
457
458        Returns:
459            Updated CloudConnection object with refreshed info
460        """
461        configurations = api_util.build_stream_configurations(stream_names)
462
463        updated_response = api_util.patch_connection(
464            connection_id=self.connection_id,
465            api_root=self.workspace.api_root,
466            client_id=self.workspace.client_id,
467            client_secret=self.workspace.client_secret,
468            bearer_token=self.workspace.bearer_token,
469            configurations=configurations,
470        )
471        self._connection_info = updated_response
472        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

enabled: bool
476    @property
477    def enabled(self) -> bool:
478        """Get the current enabled status of the connection.
479
480        This property always fetches fresh data from the API to ensure accuracy,
481        as another process or user may have toggled the setting.
482
483        Returns:
484            True if the connection status is 'active', False otherwise.
485        """
486        connection_info = self._fetch_connection_info(force_refresh=True)
487        return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE

Get the current enabled status of the connection.

This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.

Returns:

True if the connection status is 'active', False otherwise.

def set_enabled(self, *, enabled: bool, ignore_noop: bool = True) -> None:
499    def set_enabled(
500        self,
501        *,
502        enabled: bool,
503        ignore_noop: bool = True,
504    ) -> None:
505        """Set the enabled status of the connection.
506
507        Args:
508            enabled: True to enable (set status to 'active'), False to disable
509                (set status to 'inactive').
510            ignore_noop: If True (default), silently return if the connection is already
511                in the requested state. If False, raise ValueError when the requested
512                state matches the current state.
513
514        Raises:
515            ValueError: If ignore_noop is False and the connection is already in the
516                requested state.
517        """
518        # Always fetch fresh data to check current status
519        connection_info = self._fetch_connection_info(force_refresh=True)
520        current_status = connection_info.status
521        desired_status = (
522            api_util.models.ConnectionStatusEnum.ACTIVE
523            if enabled
524            else api_util.models.ConnectionStatusEnum.INACTIVE
525        )
526
527        if current_status == desired_status:
528            if ignore_noop:
529                return
530            raise ValueError(
531                f"Connection is already {'enabled' if enabled else 'disabled'}. "
532                f"Current status: {current_status}"
533            )
534
535        updated_response = api_util.patch_connection(
536            connection_id=self.connection_id,
537            api_root=self.workspace.api_root,
538            client_id=self.workspace.client_id,
539            client_secret=self.workspace.client_secret,
540            bearer_token=self.workspace.bearer_token,
541            status=desired_status,
542        )
543        self._connection_info = updated_response

Set the enabled status of the connection.

Arguments:
  • enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
  • ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
  • ValueError: If ignore_noop is False and the connection is already in the requested state.
def set_schedule(self, cron_expression: str) -> None:
547    def set_schedule(
548        self,
549        cron_expression: str,
550    ) -> None:
551        """Set a cron schedule for the connection.
552
553        Args:
554            cron_expression: A cron expression defining when syncs should run.
555
556        Examples:
557                - "0 0 * * *" - Daily at midnight UTC
558                - "0 */6 * * *" - Every 6 hours
559                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
560        """
561        schedule = api_util.models.AirbyteAPIConnectionSchedule(
562            schedule_type=api_util.models.ScheduleTypeEnum.CRON,
563            cron_expression=cron_expression,
564        )
565        updated_response = api_util.patch_connection(
566            connection_id=self.connection_id,
567            api_root=self.workspace.api_root,
568            client_id=self.workspace.client_id,
569            client_secret=self.workspace.client_secret,
570            bearer_token=self.workspace.bearer_token,
571            schedule=schedule,
572        )
573        self._connection_info = updated_response

Set a cron schedule for the connection.

Arguments:
  • cron_expression: A cron expression defining when syncs should run.
Examples:
  • "0 0 * * *" - Daily at midnight UTC
  • "0 */6 * * *" - Every 6 hours
  • "0 0 * * 0" - Weekly on Sunday at midnight UTC
def set_manual_schedule(self) -> None:
575    def set_manual_schedule(self) -> None:
576        """Set the connection to manual scheduling.
577
578        Disables automatic syncs. Syncs will only run when manually triggered.
579        """
580        schedule = api_util.models.AirbyteAPIConnectionSchedule(
581            schedule_type=api_util.models.ScheduleTypeEnum.MANUAL,
582        )
583        updated_response = api_util.patch_connection(
584            connection_id=self.connection_id,
585            api_root=self.workspace.api_root,
586            client_id=self.workspace.client_id,
587            client_secret=self.workspace.client_secret,
588            bearer_token=self.workspace.bearer_token,
589            schedule=schedule,
590        )
591        self._connection_info = updated_response

Set the connection to manual scheduling.

Disables automatic syncs. Syncs will only run when manually triggered.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
595    def permanently_delete(
596        self,
597        *,
598        cascade_delete_source: bool = False,
599        cascade_delete_destination: bool = False,
600    ) -> None:
601        """Delete the connection.
602
603        Args:
604            cascade_delete_source: Whether to also delete the source.
605            cascade_delete_destination: Whether to also delete the destination.
606        """
607        self.workspace.permanently_delete_connection(self)
608
609        if cascade_delete_source:
610            self.workspace.permanently_delete_source(self.source_id)
611
612        if cascade_delete_destination:
613            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.