airbyte.cloud.connections

Cloud Connections.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Cloud Connections."""
  3
  4from __future__ import annotations
  5
  6from typing import TYPE_CHECKING
  7
  8from airbyte._util import api_util
  9from airbyte.cloud.connectors import CloudDestination, CloudSource
 10from airbyte.cloud.sync_results import SyncResult
 11
 12
 13if TYPE_CHECKING:
 14    from airbyte_api.models import ConnectionResponse, JobResponse
 15
 16    from airbyte.cloud.workspaces import CloudWorkspace
 17
 18
 19class CloudConnection:
 20    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 21
 22    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 23    """
 24
 25    def __init__(
 26        self,
 27        workspace: CloudWorkspace,
 28        connection_id: str,
 29        source: str | None = None,
 30        destination: str | None = None,
 31    ) -> None:
 32        """It is not recommended to create a `CloudConnection` object directly.
 33
 34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 35        """
 36        self.connection_id = connection_id
 37        """The ID of the connection."""
 38
 39        self.workspace = workspace
 40        """The workspace that the connection belongs to."""
 41
 42        self._source_id = source
 43        """The ID of the source."""
 44
 45        self._destination_id = destination
 46        """The ID of the destination."""
 47
 48        self._connection_info: ConnectionResponse | None = None
 49        """The connection info object. (Cached.)"""
 50
 51        self._cloud_source_object: CloudSource | None = None
 52        """The source object. (Cached.)"""
 53
 54        self._cloud_destination_object: CloudDestination | None = None
 55        """The destination object. (Cached.)"""
 56
 57    def _fetch_connection_info(self) -> ConnectionResponse:
 58        """Populate the connection with data from the API."""
 59        return api_util.get_connection(
 60            workspace_id=self.workspace.workspace_id,
 61            connection_id=self.connection_id,
 62            api_root=self.workspace.api_root,
 63            client_id=self.workspace.client_id,
 64            client_secret=self.workspace.client_secret,
 65        )
 66
 67    @classmethod
 68    def _from_connection_response(
 69        cls,
 70        workspace: CloudWorkspace,
 71        connection_response: ConnectionResponse,
 72    ) -> CloudConnection:
 73        """Create a CloudConnection from a ConnectionResponse."""
 74        result = cls(
 75            workspace=workspace,
 76            connection_id=connection_response.connection_id,
 77            source=connection_response.source_id,
 78            destination=connection_response.destination_id,
 79        )
 80        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
 81        return result
 82
 83    # Properties
 84
 85    @property
 86    def name(self) -> str | None:
 87        """Get the display name of the connection, if available.
 88
 89        E.g. "My Postgres to Snowflake", not the connection ID.
 90        """
 91        if not self._connection_info:
 92            self._connection_info = self._fetch_connection_info()
 93
 94        return self._connection_info.name
 95
 96    @property
 97    def source_id(self) -> str:
 98        """The ID of the source."""
 99        if not self._source_id:
100            if not self._connection_info:
101                self._connection_info = self._fetch_connection_info()
102
103            self._source_id = self._connection_info.source_id
104
105        return self._source_id
106
107    @property
108    def source(self) -> CloudSource:
109        """Get the source object."""
110        if self._cloud_source_object:
111            return self._cloud_source_object
112
113        self._cloud_source_object = CloudSource(
114            workspace=self.workspace,
115            connector_id=self.source_id,
116        )
117        return self._cloud_source_object
118
119    @property
120    def destination_id(self) -> str:
121        """The ID of the destination."""
122        if not self._destination_id:
123            if not self._connection_info:
124                self._connection_info = self._fetch_connection_info()
125
126            self._destination_id = self._connection_info.source_id
127
128        return self._destination_id
129
130    @property
131    def destination(self) -> CloudDestination:
132        """Get the destination object."""
133        if self._cloud_destination_object:
134            return self._cloud_destination_object
135
136        self._cloud_destination_object = CloudDestination(
137            workspace=self.workspace,
138            connector_id=self.destination_id,
139        )
140        return self._cloud_destination_object
141
142    @property
143    def stream_names(self) -> list[str]:
144        """The stream names."""
145        if not self._connection_info:
146            self._connection_info = self._fetch_connection_info()
147
148        return [stream.name for stream in self._connection_info.configurations.streams or []]
149
150    @property
151    def table_prefix(self) -> str:
152        """The table prefix."""
153        if not self._connection_info:
154            self._connection_info = self._fetch_connection_info()
155
156        return self._connection_info.prefix or ""
157
158    @property
159    def connection_url(self) -> str | None:
160        """The web URL to the connection."""
161        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
162
163    @property
164    def job_history_url(self) -> str | None:
165        """The URL to the job history for the connection."""
166        return f"{self.connection_url}/timeline"
167
168    # Run Sync
169
170    def run_sync(
171        self,
172        *,
173        wait: bool = True,
174        wait_timeout: int = 300,
175    ) -> SyncResult:
176        """Run a sync."""
177        connection_response = api_util.run_connection(
178            connection_id=self.connection_id,
179            api_root=self.workspace.api_root,
180            workspace_id=self.workspace.workspace_id,
181            client_id=self.workspace.client_id,
182            client_secret=self.workspace.client_secret,
183        )
184        sync_result = SyncResult(
185            workspace=self.workspace,
186            connection=self,
187            job_id=connection_response.job_id,
188        )
189
190        if wait:
191            sync_result.wait_for_completion(
192                wait_timeout=wait_timeout,
193                raise_failure=True,
194                raise_timeout=True,
195            )
196
197        return sync_result
198
199    def __repr__(self) -> str:
200        """String representation of the connection."""
201        return (
202            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
203            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
204        )
205
206    # Logs
207
208    def get_previous_sync_logs(
209        self,
210        *,
211        limit: int = 10,
212    ) -> list[SyncResult]:
213        """Get the previous sync logs for a connection."""
214        sync_logs: list[JobResponse] = api_util.get_job_logs(
215            connection_id=self.connection_id,
216            api_root=self.workspace.api_root,
217            workspace_id=self.workspace.workspace_id,
218            limit=limit,
219            client_id=self.workspace.client_id,
220            client_secret=self.workspace.client_secret,
221        )
222        return [
223            SyncResult(
224                workspace=self.workspace,
225                connection=self,
226                job_id=sync_log.job_id,
227                _latest_job_info=sync_log,
228            )
229            for sync_log in sync_logs
230        ]
231
232    def get_sync_result(
233        self,
234        job_id: int | None = None,
235    ) -> SyncResult | None:
236        """Get the sync result for the connection.
237
238        If `job_id` is not provided, the most recent sync job will be used.
239
240        Returns `None` if job_id is omitted and no previous jobs are found.
241        """
242        if job_id is None:
243            # Get the most recent sync job
244            results = self.get_previous_sync_logs(
245                limit=1,
246            )
247            if results:
248                return results[0]
249
250            return None
251
252        # Get the sync job by ID (lazy loaded)
253        return SyncResult(
254            workspace=self.workspace,
255            connection=self,
256            job_id=job_id,
257        )
258
259    def rename(self, name: str) -> CloudConnection:
260        """Rename the connection.
261
262        Args:
263            name: New name for the connection
264
265        Returns:
266            Updated CloudConnection object with refreshed info
267        """
268        updated_response = api_util.patch_connection(
269            connection_id=self.connection_id,
270            api_root=self.workspace.api_root,
271            client_id=self.workspace.client_id,
272            client_secret=self.workspace.client_secret,
273            name=name,
274        )
275        self._connection_info = updated_response
276        return self
277
278    def set_table_prefix(self, prefix: str) -> CloudConnection:
279        """Set the table prefix for the connection.
280
281        Args:
282            prefix: New table prefix to use when syncing to the destination
283
284        Returns:
285            Updated CloudConnection object with refreshed info
286        """
287        updated_response = api_util.patch_connection(
288            connection_id=self.connection_id,
289            api_root=self.workspace.api_root,
290            client_id=self.workspace.client_id,
291            client_secret=self.workspace.client_secret,
292            prefix=prefix,
293        )
294        self._connection_info = updated_response
295        return self
296
297    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
298        """Set the selected streams for the connection.
299
300        This is a destructive operation that can break existing connections if the
301        stream selection is changed incorrectly. Use with caution.
302
303        Args:
304            stream_names: List of stream names to sync
305
306        Returns:
307            Updated CloudConnection object with refreshed info
308        """
309        configurations = api_util.build_stream_configurations(stream_names)
310
311        updated_response = api_util.patch_connection(
312            connection_id=self.connection_id,
313            api_root=self.workspace.api_root,
314            client_id=self.workspace.client_id,
315            client_secret=self.workspace.client_secret,
316            configurations=configurations,
317        )
318        self._connection_info = updated_response
319        return self
320
321    # Deletions
322
323    def permanently_delete(
324        self,
325        *,
326        cascade_delete_source: bool = False,
327        cascade_delete_destination: bool = False,
328    ) -> None:
329        """Delete the connection.
330
331        Args:
332            cascade_delete_source: Whether to also delete the source.
333            cascade_delete_destination: Whether to also delete the destination.
334        """
335        self.workspace.permanently_delete_connection(self)
336
337        if cascade_delete_source:
338            self.workspace.permanently_delete_source(self.source_id)
339
340        if cascade_delete_destination:
341            self.workspace.permanently_delete_destination(self.destination_id)
class CloudConnection:
 20class CloudConnection:
 21    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 22
 23    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 24    """
 25
 26    def __init__(
 27        self,
 28        workspace: CloudWorkspace,
 29        connection_id: str,
 30        source: str | None = None,
 31        destination: str | None = None,
 32    ) -> None:
 33        """It is not recommended to create a `CloudConnection` object directly.
 34
 35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 36        """
 37        self.connection_id = connection_id
 38        """The ID of the connection."""
 39
 40        self.workspace = workspace
 41        """The workspace that the connection belongs to."""
 42
 43        self._source_id = source
 44        """The ID of the source."""
 45
 46        self._destination_id = destination
 47        """The ID of the destination."""
 48
 49        self._connection_info: ConnectionResponse | None = None
 50        """The connection info object. (Cached.)"""
 51
 52        self._cloud_source_object: CloudSource | None = None
 53        """The source object. (Cached.)"""
 54
 55        self._cloud_destination_object: CloudDestination | None = None
 56        """The destination object. (Cached.)"""
 57
 58    def _fetch_connection_info(self) -> ConnectionResponse:
 59        """Populate the connection with data from the API."""
 60        return api_util.get_connection(
 61            workspace_id=self.workspace.workspace_id,
 62            connection_id=self.connection_id,
 63            api_root=self.workspace.api_root,
 64            client_id=self.workspace.client_id,
 65            client_secret=self.workspace.client_secret,
 66        )
 67
 68    @classmethod
 69    def _from_connection_response(
 70        cls,
 71        workspace: CloudWorkspace,
 72        connection_response: ConnectionResponse,
 73    ) -> CloudConnection:
 74        """Create a CloudConnection from a ConnectionResponse."""
 75        result = cls(
 76            workspace=workspace,
 77            connection_id=connection_response.connection_id,
 78            source=connection_response.source_id,
 79            destination=connection_response.destination_id,
 80        )
 81        result._connection_info = connection_response  # noqa: SLF001 # Accessing Non-Public API
 82        return result
 83
 84    # Properties
 85
 86    @property
 87    def name(self) -> str | None:
 88        """Get the display name of the connection, if available.
 89
 90        E.g. "My Postgres to Snowflake", not the connection ID.
 91        """
 92        if not self._connection_info:
 93            self._connection_info = self._fetch_connection_info()
 94
 95        return self._connection_info.name
 96
 97    @property
 98    def source_id(self) -> str:
 99        """The ID of the source."""
100        if not self._source_id:
101            if not self._connection_info:
102                self._connection_info = self._fetch_connection_info()
103
104            self._source_id = self._connection_info.source_id
105
106        return self._source_id
107
108    @property
109    def source(self) -> CloudSource:
110        """Get the source object."""
111        if self._cloud_source_object:
112            return self._cloud_source_object
113
114        self._cloud_source_object = CloudSource(
115            workspace=self.workspace,
116            connector_id=self.source_id,
117        )
118        return self._cloud_source_object
119
120    @property
121    def destination_id(self) -> str:
122        """The ID of the destination."""
123        if not self._destination_id:
124            if not self._connection_info:
125                self._connection_info = self._fetch_connection_info()
126
127            self._destination_id = self._connection_info.source_id
128
129        return self._destination_id
130
131    @property
132    def destination(self) -> CloudDestination:
133        """Get the destination object."""
134        if self._cloud_destination_object:
135            return self._cloud_destination_object
136
137        self._cloud_destination_object = CloudDestination(
138            workspace=self.workspace,
139            connector_id=self.destination_id,
140        )
141        return self._cloud_destination_object
142
143    @property
144    def stream_names(self) -> list[str]:
145        """The stream names."""
146        if not self._connection_info:
147            self._connection_info = self._fetch_connection_info()
148
149        return [stream.name for stream in self._connection_info.configurations.streams or []]
150
151    @property
152    def table_prefix(self) -> str:
153        """The table prefix."""
154        if not self._connection_info:
155            self._connection_info = self._fetch_connection_info()
156
157        return self._connection_info.prefix or ""
158
159    @property
160    def connection_url(self) -> str | None:
161        """The web URL to the connection."""
162        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
163
164    @property
165    def job_history_url(self) -> str | None:
166        """The URL to the job history for the connection."""
167        return f"{self.connection_url}/timeline"
168
169    # Run Sync
170
171    def run_sync(
172        self,
173        *,
174        wait: bool = True,
175        wait_timeout: int = 300,
176    ) -> SyncResult:
177        """Run a sync."""
178        connection_response = api_util.run_connection(
179            connection_id=self.connection_id,
180            api_root=self.workspace.api_root,
181            workspace_id=self.workspace.workspace_id,
182            client_id=self.workspace.client_id,
183            client_secret=self.workspace.client_secret,
184        )
185        sync_result = SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=connection_response.job_id,
189        )
190
191        if wait:
192            sync_result.wait_for_completion(
193                wait_timeout=wait_timeout,
194                raise_failure=True,
195                raise_timeout=True,
196            )
197
198        return sync_result
199
200    def __repr__(self) -> str:
201        """String representation of the connection."""
202        return (
203            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
204            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
205        )
206
207    # Logs
208
209    def get_previous_sync_logs(
210        self,
211        *,
212        limit: int = 10,
213    ) -> list[SyncResult]:
214        """Get the previous sync logs for a connection."""
215        sync_logs: list[JobResponse] = api_util.get_job_logs(
216            connection_id=self.connection_id,
217            api_root=self.workspace.api_root,
218            workspace_id=self.workspace.workspace_id,
219            limit=limit,
220            client_id=self.workspace.client_id,
221            client_secret=self.workspace.client_secret,
222        )
223        return [
224            SyncResult(
225                workspace=self.workspace,
226                connection=self,
227                job_id=sync_log.job_id,
228                _latest_job_info=sync_log,
229            )
230            for sync_log in sync_logs
231        ]
232
233    def get_sync_result(
234        self,
235        job_id: int | None = None,
236    ) -> SyncResult | None:
237        """Get the sync result for the connection.
238
239        If `job_id` is not provided, the most recent sync job will be used.
240
241        Returns `None` if job_id is omitted and no previous jobs are found.
242        """
243        if job_id is None:
244            # Get the most recent sync job
245            results = self.get_previous_sync_logs(
246                limit=1,
247            )
248            if results:
249                return results[0]
250
251            return None
252
253        # Get the sync job by ID (lazy loaded)
254        return SyncResult(
255            workspace=self.workspace,
256            connection=self,
257            job_id=job_id,
258        )
259
260    def rename(self, name: str) -> CloudConnection:
261        """Rename the connection.
262
263        Args:
264            name: New name for the connection
265
266        Returns:
267            Updated CloudConnection object with refreshed info
268        """
269        updated_response = api_util.patch_connection(
270            connection_id=self.connection_id,
271            api_root=self.workspace.api_root,
272            client_id=self.workspace.client_id,
273            client_secret=self.workspace.client_secret,
274            name=name,
275        )
276        self._connection_info = updated_response
277        return self
278
279    def set_table_prefix(self, prefix: str) -> CloudConnection:
280        """Set the table prefix for the connection.
281
282        Args:
283            prefix: New table prefix to use when syncing to the destination
284
285        Returns:
286            Updated CloudConnection object with refreshed info
287        """
288        updated_response = api_util.patch_connection(
289            connection_id=self.connection_id,
290            api_root=self.workspace.api_root,
291            client_id=self.workspace.client_id,
292            client_secret=self.workspace.client_secret,
293            prefix=prefix,
294        )
295        self._connection_info = updated_response
296        return self
297
298    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
299        """Set the selected streams for the connection.
300
301        This is a destructive operation that can break existing connections if the
302        stream selection is changed incorrectly. Use with caution.
303
304        Args:
305            stream_names: List of stream names to sync
306
307        Returns:
308            Updated CloudConnection object with refreshed info
309        """
310        configurations = api_util.build_stream_configurations(stream_names)
311
312        updated_response = api_util.patch_connection(
313            connection_id=self.connection_id,
314            api_root=self.workspace.api_root,
315            client_id=self.workspace.client_id,
316            client_secret=self.workspace.client_secret,
317            configurations=configurations,
318        )
319        self._connection_info = updated_response
320        return self
321
322    # Deletions
323
324    def permanently_delete(
325        self,
326        *,
327        cascade_delete_source: bool = False,
328        cascade_delete_destination: bool = False,
329    ) -> None:
330        """Delete the connection.
331
332        Args:
333            cascade_delete_source: Whether to also delete the source.
334            cascade_delete_destination: Whether to also delete the destination.
335        """
336        self.workspace.permanently_delete_connection(self)
337
338        if cascade_delete_source:
339            self.workspace.permanently_delete_source(self.source_id)
340
341        if cascade_delete_destination:
342            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: airbyte.cloud.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
26    def __init__(
27        self,
28        workspace: CloudWorkspace,
29        connection_id: str,
30        source: str | None = None,
31        destination: str | None = None,
32    ) -> None:
33        """It is not recommended to create a `CloudConnection` object directly.
34
35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
36        """
37        self.connection_id = connection_id
38        """The ID of the connection."""
39
40        self.workspace = workspace
41        """The workspace that the connection belongs to."""
42
43        self._source_id = source
44        """The ID of the source."""
45
46        self._destination_id = destination
47        """The ID of the destination."""
48
49        self._connection_info: ConnectionResponse | None = None
50        """The connection info object. (Cached.)"""
51
52        self._cloud_source_object: CloudSource | None = None
53        """The source object. (Cached.)"""
54
55        self._cloud_destination_object: CloudDestination | None = None
56        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

name: str | None
86    @property
87    def name(self) -> str | None:
88        """Get the display name of the connection, if available.
89
90        E.g. "My Postgres to Snowflake", not the connection ID.
91        """
92        if not self._connection_info:
93            self._connection_info = self._fetch_connection_info()
94
95        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
 97    @property
 98    def source_id(self) -> str:
 99        """The ID of the source."""
100        if not self._source_id:
101            if not self._connection_info:
102                self._connection_info = self._fetch_connection_info()
103
104            self._source_id = self._connection_info.source_id
105
106        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
108    @property
109    def source(self) -> CloudSource:
110        """Get the source object."""
111        if self._cloud_source_object:
112            return self._cloud_source_object
113
114        self._cloud_source_object = CloudSource(
115            workspace=self.workspace,
116            connector_id=self.source_id,
117        )
118        return self._cloud_source_object

Get the source object.

destination_id: str
120    @property
121    def destination_id(self) -> str:
122        """The ID of the destination."""
123        if not self._destination_id:
124            if not self._connection_info:
125                self._connection_info = self._fetch_connection_info()
126
127            self._destination_id = self._connection_info.source_id
128
129        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
131    @property
132    def destination(self) -> CloudDestination:
133        """Get the destination object."""
134        if self._cloud_destination_object:
135            return self._cloud_destination_object
136
137        self._cloud_destination_object = CloudDestination(
138            workspace=self.workspace,
139            connector_id=self.destination_id,
140        )
141        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
143    @property
144    def stream_names(self) -> list[str]:
145        """The stream names."""
146        if not self._connection_info:
147            self._connection_info = self._fetch_connection_info()
148
149        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
151    @property
152    def table_prefix(self) -> str:
153        """The table prefix."""
154        if not self._connection_info:
155            self._connection_info = self._fetch_connection_info()
156
157        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
159    @property
160    def connection_url(self) -> str | None:
161        """The web URL to the connection."""
162        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
164    @property
165    def job_history_url(self) -> str | None:
166        """The URL to the job history for the connection."""
167        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.SyncResult:
171    def run_sync(
172        self,
173        *,
174        wait: bool = True,
175        wait_timeout: int = 300,
176    ) -> SyncResult:
177        """Run a sync."""
178        connection_response = api_util.run_connection(
179            connection_id=self.connection_id,
180            api_root=self.workspace.api_root,
181            workspace_id=self.workspace.workspace_id,
182            client_id=self.workspace.client_id,
183            client_secret=self.workspace.client_secret,
184        )
185        sync_result = SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=connection_response.job_id,
189        )
190
191        if wait:
192            sync_result.wait_for_completion(
193                wait_timeout=wait_timeout,
194                raise_failure=True,
195                raise_timeout=True,
196            )
197
198        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[airbyte.cloud.SyncResult]:
209    def get_previous_sync_logs(
210        self,
211        *,
212        limit: int = 10,
213    ) -> list[SyncResult]:
214        """Get the previous sync logs for a connection."""
215        sync_logs: list[JobResponse] = api_util.get_job_logs(
216            connection_id=self.connection_id,
217            api_root=self.workspace.api_root,
218            workspace_id=self.workspace.workspace_id,
219            limit=limit,
220            client_id=self.workspace.client_id,
221            client_secret=self.workspace.client_secret,
222        )
223        return [
224            SyncResult(
225                workspace=self.workspace,
226                connection=self,
227                job_id=sync_log.job_id,
228                _latest_job_info=sync_log,
229            )
230            for sync_log in sync_logs
231        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: int | None = None) -> airbyte.cloud.SyncResult | None:
233    def get_sync_result(
234        self,
235        job_id: int | None = None,
236    ) -> SyncResult | None:
237        """Get the sync result for the connection.
238
239        If `job_id` is not provided, the most recent sync job will be used.
240
241        Returns `None` if job_id is omitted and no previous jobs are found.
242        """
243        if job_id is None:
244            # Get the most recent sync job
245            results = self.get_previous_sync_logs(
246                limit=1,
247            )
248            if results:
249                return results[0]
250
251            return None
252
253        # Get the sync job by ID (lazy loaded)
254        return SyncResult(
255            workspace=self.workspace,
256            connection=self,
257            job_id=job_id,
258        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def rename(self, name: str) -> CloudConnection:
260    def rename(self, name: str) -> CloudConnection:
261        """Rename the connection.
262
263        Args:
264            name: New name for the connection
265
266        Returns:
267            Updated CloudConnection object with refreshed info
268        """
269        updated_response = api_util.patch_connection(
270            connection_id=self.connection_id,
271            api_root=self.workspace.api_root,
272            client_id=self.workspace.client_id,
273            client_secret=self.workspace.client_secret,
274            name=name,
275        )
276        self._connection_info = updated_response
277        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
279    def set_table_prefix(self, prefix: str) -> CloudConnection:
280        """Set the table prefix for the connection.
281
282        Args:
283            prefix: New table prefix to use when syncing to the destination
284
285        Returns:
286            Updated CloudConnection object with refreshed info
287        """
288        updated_response = api_util.patch_connection(
289            connection_id=self.connection_id,
290            api_root=self.workspace.api_root,
291            client_id=self.workspace.client_id,
292            client_secret=self.workspace.client_secret,
293            prefix=prefix,
294        )
295        self._connection_info = updated_response
296        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
298    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
299        """Set the selected streams for the connection.
300
301        This is a destructive operation that can break existing connections if the
302        stream selection is changed incorrectly. Use with caution.
303
304        Args:
305            stream_names: List of stream names to sync
306
307        Returns:
308            Updated CloudConnection object with refreshed info
309        """
310        configurations = api_util.build_stream_configurations(stream_names)
311
312        updated_response = api_util.patch_connection(
313            connection_id=self.connection_id,
314            api_root=self.workspace.api_root,
315            client_id=self.workspace.client_id,
316            client_secret=self.workspace.client_secret,
317            configurations=configurations,
318        )
319        self._connection_info = updated_response
320        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
324    def permanently_delete(
325        self,
326        *,
327        cascade_delete_source: bool = False,
328        cascade_delete_destination: bool = False,
329    ) -> None:
330        """Delete the connection.
331
332        Args:
333            cascade_delete_source: Whether to also delete the source.
334            cascade_delete_destination: Whether to also delete the destination.
335        """
336        self.workspace.permanently_delete_connection(self)
337
338        if cascade_delete_source:
339            self.workspace.permanently_delete_source(self.source_id)
340
341        if cascade_delete_destination:
342            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.