airbyte.cloud.workspaces

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

By overriding api_root, you can use this module to interact with self-managed Airbyte instances, both OSS and Enterprise.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
  3
  4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances,
  5both OSS and Enterprise.
  6"""
  7
  8from __future__ import annotations
  9
 10from dataclasses import dataclass
 11from typing import TYPE_CHECKING
 12
 13from airbyte import exceptions as exc
 14from airbyte._util.api_util import (
 15    CLOUD_API_ROOT,
 16    create_connection,
 17    create_destination,
 18    create_source,
 19    delete_connection,
 20    delete_destination,
 21    delete_source,
 22    get_workspace,
 23)
 24from airbyte.cloud._destination_util import get_destination_config_from_cache
 25from airbyte.cloud.connections import CloudConnection
 26from airbyte.cloud.sync_results import SyncResult
 27from airbyte.sources.base import Source
 28
 29
 30if TYPE_CHECKING:
 31    from airbyte._util.api_imports import DestinationResponse
 32    from airbyte.caches.base import CacheBase
 33
 34
 35@dataclass
 36class CloudWorkspace:
 37    """A remote workspace on the Airbyte Cloud.
 38
 39    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 40    instances, both OSS and Enterprise.
 41    """
 42
 43    workspace_id: str
 44    api_key: str
 45    api_root: str = CLOUD_API_ROOT
 46
 47    @property
 48    def workspace_url(self) -> str | None:
 49        return f"{self.api_root}/workspaces/{self.workspace_id}"
 50
 51    # Test connection and creds
 52
 53    def connect(self) -> None:
 54        """Check that the workspace is reachable and raise an exception otherwise.
 55
 56        Note: It is not necessary to call this method before calling other operations. It
 57              serves primarily as a simple check to ensure that the workspace is reachable
 58              and credentials are correct.
 59        """
 60        _ = get_workspace(
 61            api_root=self.api_root,
 62            api_key=self.api_key,
 63            workspace_id=self.workspace_id,
 64        )
 65        print(f"Successfully connected to workspace: {self.workspace_url}")
 66
 67    # Deploy and delete sources
 68
 69    # TODO: Make this a public API
 70    def _deploy_source(
 71        self,
 72        source: Source,
 73    ) -> str:
 74        """Deploy a source to the workspace.
 75
 76        Returns the newly deployed source ID.
 77        """
 78        source_configuration = source.get_config().copy()
 79        source_configuration["sourceType"] = source.name.replace("source-", "")
 80
 81        deployed_source = create_source(
 82            name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)",
 83            api_root=self.api_root,
 84            api_key=self.api_key,
 85            workspace_id=self.workspace_id,
 86            config=source_configuration,
 87        )
 88
 89        # Set the deployment Ids on the source object
 90        source._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
 91        source._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
 92        source._deployed_source_id = deployed_source.source_id  # noqa: SLF001  # Accessing nn-public API
 93
 94        return deployed_source.source_id
 95
 96    def _permanently_delete_source(
 97        self,
 98        source: str | Source,
 99    ) -> None:
100        """Delete a source from the workspace.
101
102        You can pass either the source ID `str` or a deployed `Source` object.
103        """
104        if not isinstance(source, (str, Source)):
105            raise ValueError(f"Invalid source type: {type(source)}")  # noqa: TRY004, TRY003
106
107        if isinstance(source, Source):
108            if not source._deployed_source_id:  # noqa: SLF001
109                raise ValueError("Source has not been deployed.")  # noqa: TRY003
110
111            source_id = source._deployed_source_id  # noqa: SLF001
112
113        elif isinstance(source, str):
114            source_id = source
115
116        delete_source(
117            source_id=source_id,
118            api_root=self.api_root,
119            api_key=self.api_key,
120        )
121
122    # Deploy and delete destinations
123
124    # TODO: Make this a public API
125    def _deploy_cache_as_destination(
126        self,
127        cache: CacheBase,
128    ) -> str:
129        """Deploy a cache to the workspace as a new destination.
130
131        Returns the newly deployed destination ID.
132        """
133        cache_type_name = cache.__class__.__name__.replace("Cache", "")
134
135        deployed_destination: DestinationResponse = create_destination(
136            name=f"Destination {cache_type_name} (Deployed by PyAirbyte)",
137            api_root=self.api_root,
138            api_key=self.api_key,
139            workspace_id=self.workspace_id,
140            config=get_destination_config_from_cache(cache),
141        )
142
143        # Set the deployment Ids on the source object
144        cache._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
145        cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
146        cache._deployed_destination_id = deployed_destination.destination_id  # noqa: SLF001  # Accessing nn-public API
147
148        return deployed_destination.destination_id
149
150    def _permanently_delete_destination(
151        self,
152        *,
153        destination: str | None = None,
154        cache: CacheBase | None = None,
155    ) -> None:
156        """Delete a deployed destination from the workspace.
157
158        You can pass either the `Cache` class or the deployed destination ID as a `str`.
159        """
160        if destination is None and cache is None:
161            raise ValueError("You must provide either a destination ID or a cache object.")  # noqa: TRY003
162        if destination is not None and cache is not None:
163            raise ValueError(  # noqa: TRY003
164                "You must provide either a destination ID or a cache object, not both."
165            )
166
167        if cache:
168            if not cache._deployed_destination_id:  # noqa: SLF001
169                raise ValueError("Cache has not been deployed.")  # noqa: TRY003
170
171            destination = cache._deployed_destination_id  # noqa: SLF001
172
173        if destination is None:
174            raise ValueError("No destination ID provided.")  # noqa: TRY003
175
176        delete_destination(
177            destination_id=destination,
178            api_root=self.api_root,
179            api_key=self.api_key,
180        )
181
182    # Deploy and delete connections
183
184    # TODO: Make this a public API
185    def _deploy_connection(
186        self,
187        source: Source | str,
188        cache: CacheBase | None = None,
189        destination: str | None = None,
190        table_prefix: str | None = None,
191        selected_streams: list[str] | None = None,
192    ) -> CloudConnection:
193        """Deploy a source and cache to the workspace as a new connection.
194
195        Returns the newly deployed connection ID as a `str`.
196
197        Args:
198            source (Source | str): The source to deploy. You can pass either an already deployed
199                source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object,
200                it will be deployed automatically.
201            cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
202                `cache` or `destination`, but not both.
203            destination (str, optional): The destination ID to use. You can provide
204                `cache` or `destination`, but not both.
205        """
206        # Resolve source ID
207        source_id: str
208        if isinstance(source, Source):
209            selected_streams = selected_streams or source.get_selected_streams()
210            if source._deployed_source_id:  # noqa: SLF001
211                source_id = source._deployed_source_id  # noqa: SLF001
212            else:
213                source_id = self._deploy_source(source)
214        else:
215            source_id = source
216            if not selected_streams:
217                raise exc.PyAirbyteInputError(
218                    guidance="You must provide `selected_streams` when deploying a source ID."
219                )
220
221        # Resolve destination ID
222        destination_id: str
223        if destination:
224            destination_id = destination
225        elif cache:
226            table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "")
227            if not cache._deployed_destination_id:  # noqa: SLF001
228                destination_id = self._deploy_cache_as_destination(cache)
229            else:
230                destination_id = cache._deployed_destination_id  # noqa: SLF001
231        else:
232            raise exc.PyAirbyteInputError(
233                guidance="You must provide either a destination ID or a cache object."
234            )
235
236        assert source_id is not None
237        assert destination_id is not None
238
239        deployed_connection = create_connection(
240            name="Connection (Deployed by PyAirbyte)",
241            source_id=source_id,
242            destination_id=destination_id,
243            api_root=self.api_root,
244            api_key=self.api_key,
245            workspace_id=self.workspace_id,
246            selected_stream_names=selected_streams,
247            prefix=table_prefix or "",
248        )
249
250        if isinstance(source, Source):
251            source._deployed_api_root = self.api_root  # noqa: SLF001
252            source._deployed_workspace_id = self.workspace_id  # noqa: SLF001
253            source._deployed_source_id = source_id  # noqa: SLF001
254        if cache:
255            cache._deployed_api_root = self.api_root  # noqa: SLF001
256            cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001
257            cache._deployed_destination_id = deployed_connection.destination_id  # noqa: SLF001
258
259        return CloudConnection(
260            workspace=self,
261            connection_id=deployed_connection.connection_id,
262            source=deployed_connection.source_id,
263            destination=deployed_connection.destination_id,
264        )
265
266    def get_connection(
267        self,
268        connection_id: str,
269    ) -> CloudConnection:
270        """Get a connection by ID.
271
272        This method does not fetch data from the API. It returns a `CloudConnection` object,
273        which will be loaded lazily as needed.
274        """
275        return CloudConnection(
276            workspace=self,
277            connection_id=connection_id,
278        )
279
280    def _permanently_delete_connection(
281        self,
282        connection: str | CloudConnection,
283        *,
284        delete_source: bool = False,
285        delete_destination: bool = False,
286    ) -> None:
287        """Delete a deployed connection from the workspace."""
288        if connection is None:
289            raise ValueError("No connection ID provided.")  # noqa: TRY003
290
291        if isinstance(connection, str):
292            connection = CloudConnection(
293                workspace=self,
294                connection_id=connection,
295            )
296
297        delete_connection(
298            connection_id=connection.connection_id,
299            api_root=self.api_root,
300            api_key=self.api_key,
301            workspace_id=self.workspace_id,
302        )
303        if delete_source:
304            self._permanently_delete_source(source=connection.source_id)
305
306        if delete_destination:
307            self._permanently_delete_destination(destination=connection.destination_id)
308
309    # Run syncs
310
311    def run_sync(
312        self,
313        connection_id: str,
314        *,
315        wait: bool = True,
316        wait_timeout: int = 300,
317    ) -> SyncResult:
318        """Run a sync on a deployed connection."""
319        connection = CloudConnection(
320            workspace=self,
321            connection_id=connection_id,
322        )
323        return connection.run_sync(wait=wait, wait_timeout=wait_timeout)
324
325    # Get sync results and previous sync logs
326
327    def get_sync_result(
328        self,
329        connection_id: str,
330        job_id: str | None = None,
331    ) -> SyncResult | None:
332        """Get the sync result for a connection job.
333
334        If `job_id` is not provided, the most recent sync job will be used.
335
336        Returns `None` if job_id is omitted and no previous jobs are found.
337        """
338        connection = CloudConnection(
339            workspace=self,
340            connection_id=connection_id,
341        )
342        if job_id is None:
343            results = self.get_previous_sync_logs(
344                connection_id=connection_id,
345                limit=1,
346            )
347            if results:
348                return results[0]
349
350            return None
351        connection = CloudConnection(
352            workspace=self,
353            connection_id=connection_id,
354        )
355        return SyncResult(
356            workspace=self,
357            connection=connection,
358            job_id=job_id,
359        )
360
361    def get_previous_sync_logs(
362        self,
363        connection_id: str,
364        *,
365        limit: int = 10,
366    ) -> list[SyncResult]:
367        """Get the previous sync logs for a connection."""
368        connection = CloudConnection(
369            workspace=self,
370            connection_id=connection_id,
371        )
372        return connection.get_previous_sync_logs(
373            limit=limit,
374        )
@dataclass
class CloudWorkspace:
 36@dataclass
 37class CloudWorkspace:
 38    """A remote workspace on the Airbyte Cloud.
 39
 40    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 41    instances, both OSS and Enterprise.
 42    """
 43
 44    workspace_id: str
 45    api_key: str
 46    api_root: str = CLOUD_API_ROOT
 47
 48    @property
 49    def workspace_url(self) -> str | None:
 50        return f"{self.api_root}/workspaces/{self.workspace_id}"
 51
 52    # Test connection and creds
 53
 54    def connect(self) -> None:
 55        """Check that the workspace is reachable and raise an exception otherwise.
 56
 57        Note: It is not necessary to call this method before calling other operations. It
 58              serves primarily as a simple check to ensure that the workspace is reachable
 59              and credentials are correct.
 60        """
 61        _ = get_workspace(
 62            api_root=self.api_root,
 63            api_key=self.api_key,
 64            workspace_id=self.workspace_id,
 65        )
 66        print(f"Successfully connected to workspace: {self.workspace_url}")
 67
 68    # Deploy and delete sources
 69
 70    # TODO: Make this a public API
 71    def _deploy_source(
 72        self,
 73        source: Source,
 74    ) -> str:
 75        """Deploy a source to the workspace.
 76
 77        Returns the newly deployed source ID.
 78        """
 79        source_configuration = source.get_config().copy()
 80        source_configuration["sourceType"] = source.name.replace("source-", "")
 81
 82        deployed_source = create_source(
 83            name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)",
 84            api_root=self.api_root,
 85            api_key=self.api_key,
 86            workspace_id=self.workspace_id,
 87            config=source_configuration,
 88        )
 89
 90        # Set the deployment Ids on the source object
 91        source._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
 92        source._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
 93        source._deployed_source_id = deployed_source.source_id  # noqa: SLF001  # Accessing nn-public API
 94
 95        return deployed_source.source_id
 96
 97    def _permanently_delete_source(
 98        self,
 99        source: str | Source,
100    ) -> None:
101        """Delete a source from the workspace.
102
103        You can pass either the source ID `str` or a deployed `Source` object.
104        """
105        if not isinstance(source, (str, Source)):
106            raise ValueError(f"Invalid source type: {type(source)}")  # noqa: TRY004, TRY003
107
108        if isinstance(source, Source):
109            if not source._deployed_source_id:  # noqa: SLF001
110                raise ValueError("Source has not been deployed.")  # noqa: TRY003
111
112            source_id = source._deployed_source_id  # noqa: SLF001
113
114        elif isinstance(source, str):
115            source_id = source
116
117        delete_source(
118            source_id=source_id,
119            api_root=self.api_root,
120            api_key=self.api_key,
121        )
122
123    # Deploy and delete destinations
124
125    # TODO: Make this a public API
126    def _deploy_cache_as_destination(
127        self,
128        cache: CacheBase,
129    ) -> str:
130        """Deploy a cache to the workspace as a new destination.
131
132        Returns the newly deployed destination ID.
133        """
134        cache_type_name = cache.__class__.__name__.replace("Cache", "")
135
136        deployed_destination: DestinationResponse = create_destination(
137            name=f"Destination {cache_type_name} (Deployed by PyAirbyte)",
138            api_root=self.api_root,
139            api_key=self.api_key,
140            workspace_id=self.workspace_id,
141            config=get_destination_config_from_cache(cache),
142        )
143
144        # Set the deployment Ids on the source object
145        cache._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
146        cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
147        cache._deployed_destination_id = deployed_destination.destination_id  # noqa: SLF001  # Accessing nn-public API
148
149        return deployed_destination.destination_id
150
151    def _permanently_delete_destination(
152        self,
153        *,
154        destination: str | None = None,
155        cache: CacheBase | None = None,
156    ) -> None:
157        """Delete a deployed destination from the workspace.
158
159        You can pass either the `Cache` class or the deployed destination ID as a `str`.
160        """
161        if destination is None and cache is None:
162            raise ValueError("You must provide either a destination ID or a cache object.")  # noqa: TRY003
163        if destination is not None and cache is not None:
164            raise ValueError(  # noqa: TRY003
165                "You must provide either a destination ID or a cache object, not both."
166            )
167
168        if cache:
169            if not cache._deployed_destination_id:  # noqa: SLF001
170                raise ValueError("Cache has not been deployed.")  # noqa: TRY003
171
172            destination = cache._deployed_destination_id  # noqa: SLF001
173
174        if destination is None:
175            raise ValueError("No destination ID provided.")  # noqa: TRY003
176
177        delete_destination(
178            destination_id=destination,
179            api_root=self.api_root,
180            api_key=self.api_key,
181        )
182
183    # Deploy and delete connections
184
185    # TODO: Make this a public API
186    def _deploy_connection(
187        self,
188        source: Source | str,
189        cache: CacheBase | None = None,
190        destination: str | None = None,
191        table_prefix: str | None = None,
192        selected_streams: list[str] | None = None,
193    ) -> CloudConnection:
194        """Deploy a source and cache to the workspace as a new connection.
195
196        Returns the newly deployed connection ID as a `str`.
197
198        Args:
199            source (Source | str): The source to deploy. You can pass either an already deployed
200                source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object,
201                it will be deployed automatically.
202            cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
203                `cache` or `destination`, but not both.
204            destination (str, optional): The destination ID to use. You can provide
205                `cache` or `destination`, but not both.
206        """
207        # Resolve source ID
208        source_id: str
209        if isinstance(source, Source):
210            selected_streams = selected_streams or source.get_selected_streams()
211            if source._deployed_source_id:  # noqa: SLF001
212                source_id = source._deployed_source_id  # noqa: SLF001
213            else:
214                source_id = self._deploy_source(source)
215        else:
216            source_id = source
217            if not selected_streams:
218                raise exc.PyAirbyteInputError(
219                    guidance="You must provide `selected_streams` when deploying a source ID."
220                )
221
222        # Resolve destination ID
223        destination_id: str
224        if destination:
225            destination_id = destination
226        elif cache:
227            table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "")
228            if not cache._deployed_destination_id:  # noqa: SLF001
229                destination_id = self._deploy_cache_as_destination(cache)
230            else:
231                destination_id = cache._deployed_destination_id  # noqa: SLF001
232        else:
233            raise exc.PyAirbyteInputError(
234                guidance="You must provide either a destination ID or a cache object."
235            )
236
237        assert source_id is not None
238        assert destination_id is not None
239
240        deployed_connection = create_connection(
241            name="Connection (Deployed by PyAirbyte)",
242            source_id=source_id,
243            destination_id=destination_id,
244            api_root=self.api_root,
245            api_key=self.api_key,
246            workspace_id=self.workspace_id,
247            selected_stream_names=selected_streams,
248            prefix=table_prefix or "",
249        )
250
251        if isinstance(source, Source):
252            source._deployed_api_root = self.api_root  # noqa: SLF001
253            source._deployed_workspace_id = self.workspace_id  # noqa: SLF001
254            source._deployed_source_id = source_id  # noqa: SLF001
255        if cache:
256            cache._deployed_api_root = self.api_root  # noqa: SLF001
257            cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001
258            cache._deployed_destination_id = deployed_connection.destination_id  # noqa: SLF001
259
260        return CloudConnection(
261            workspace=self,
262            connection_id=deployed_connection.connection_id,
263            source=deployed_connection.source_id,
264            destination=deployed_connection.destination_id,
265        )
266
267    def get_connection(
268        self,
269        connection_id: str,
270    ) -> CloudConnection:
271        """Get a connection by ID.
272
273        This method does not fetch data from the API. It returns a `CloudConnection` object,
274        which will be loaded lazily as needed.
275        """
276        return CloudConnection(
277            workspace=self,
278            connection_id=connection_id,
279        )
280
281    def _permanently_delete_connection(
282        self,
283        connection: str | CloudConnection,
284        *,
285        delete_source: bool = False,
286        delete_destination: bool = False,
287    ) -> None:
288        """Delete a deployed connection from the workspace."""
289        if connection is None:
290            raise ValueError("No connection ID provided.")  # noqa: TRY003
291
292        if isinstance(connection, str):
293            connection = CloudConnection(
294                workspace=self,
295                connection_id=connection,
296            )
297
298        delete_connection(
299            connection_id=connection.connection_id,
300            api_root=self.api_root,
301            api_key=self.api_key,
302            workspace_id=self.workspace_id,
303        )
304        if delete_source:
305            self._permanently_delete_source(source=connection.source_id)
306
307        if delete_destination:
308            self._permanently_delete_destination(destination=connection.destination_id)
309
310    # Run syncs
311
312    def run_sync(
313        self,
314        connection_id: str,
315        *,
316        wait: bool = True,
317        wait_timeout: int = 300,
318    ) -> SyncResult:
319        """Run a sync on a deployed connection."""
320        connection = CloudConnection(
321            workspace=self,
322            connection_id=connection_id,
323        )
324        return connection.run_sync(wait=wait, wait_timeout=wait_timeout)
325
326    # Get sync results and previous sync logs
327
328    def get_sync_result(
329        self,
330        connection_id: str,
331        job_id: str | None = None,
332    ) -> SyncResult | None:
333        """Get the sync result for a connection job.
334
335        If `job_id` is not provided, the most recent sync job will be used.
336
337        Returns `None` if job_id is omitted and no previous jobs are found.
338        """
339        connection = CloudConnection(
340            workspace=self,
341            connection_id=connection_id,
342        )
343        if job_id is None:
344            results = self.get_previous_sync_logs(
345                connection_id=connection_id,
346                limit=1,
347            )
348            if results:
349                return results[0]
350
351            return None
352        connection = CloudConnection(
353            workspace=self,
354            connection_id=connection_id,
355        )
356        return SyncResult(
357            workspace=self,
358            connection=connection,
359            job_id=job_id,
360        )
361
362    def get_previous_sync_logs(
363        self,
364        connection_id: str,
365        *,
366        limit: int = 10,
367    ) -> list[SyncResult]:
368        """Get the previous sync logs for a connection."""
369        connection = CloudConnection(
370            workspace=self,
371            connection_id=connection_id,
372        )
373        return connection.get_previous_sync_logs(
374            limit=limit,
375        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, api_key: str, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_key: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
48    @property
49    def workspace_url(self) -> str | None:
50        return f"{self.api_root}/workspaces/{self.workspace_id}"
def connect(self) -> None:
54    def connect(self) -> None:
55        """Check that the workspace is reachable and raise an exception otherwise.
56
57        Note: It is not necessary to call this method before calling other operations. It
58              serves primarily as a simple check to ensure that the workspace is reachable
59              and credentials are correct.
60        """
61        _ = get_workspace(
62            api_root=self.api_root,
63            api_key=self.api_key,
64            workspace_id=self.workspace_id,
65        )
66        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> airbyte.cloud.connections.CloudConnection:
267    def get_connection(
268        self,
269        connection_id: str,
270    ) -> CloudConnection:
271        """Get a connection by ID.
272
273        This method does not fetch data from the API. It returns a `CloudConnection` object,
274        which will be loaded lazily as needed.
275        """
276        return CloudConnection(
277            workspace=self,
278            connection_id=connection_id,
279        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def run_sync( self, connection_id: str, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.sync_results.SyncResult:
312    def run_sync(
313        self,
314        connection_id: str,
315        *,
316        wait: bool = True,
317        wait_timeout: int = 300,
318    ) -> SyncResult:
319        """Run a sync on a deployed connection."""
320        connection = CloudConnection(
321            workspace=self,
322            connection_id=connection_id,
323        )
324        return connection.run_sync(wait=wait, wait_timeout=wait_timeout)

Run a sync on a deployed connection.

def get_sync_result( self, connection_id: str, job_id: str | None = None) -> airbyte.cloud.sync_results.SyncResult | None:
328    def get_sync_result(
329        self,
330        connection_id: str,
331        job_id: str | None = None,
332    ) -> SyncResult | None:
333        """Get the sync result for a connection job.
334
335        If `job_id` is not provided, the most recent sync job will be used.
336
337        Returns `None` if job_id is omitted and no previous jobs are found.
338        """
339        connection = CloudConnection(
340            workspace=self,
341            connection_id=connection_id,
342        )
343        if job_id is None:
344            results = self.get_previous_sync_logs(
345                connection_id=connection_id,
346                limit=1,
347            )
348            if results:
349                return results[0]
350
351            return None
352        connection = CloudConnection(
353            workspace=self,
354            connection_id=connection_id,
355        )
356        return SyncResult(
357            workspace=self,
358            connection=connection,
359            job_id=job_id,
360        )

Get the sync result for a connection job.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def get_previous_sync_logs( self, connection_id: str, *, limit: int = 10) -> list[airbyte.cloud.sync_results.SyncResult]:
362    def get_previous_sync_logs(
363        self,
364        connection_id: str,
365        *,
366        limit: int = 10,
367    ) -> list[SyncResult]:
368        """Get the previous sync logs for a connection."""
369        connection = CloudConnection(
370            workspace=self,
371            connection_id=connection_id,
372        )
373        return connection.get_previous_sync_logs(
374            limit=limit,
375        )

Get the previous sync logs for a connection.