airbyte.cloud.workspaces

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

By overriding api_root, you can use this module to interact with self-managed Airbyte instances, both OSS and Enterprise.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
  3
  4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances,
  5both OSS and Enterprise.
  6"""
  7
  8from __future__ import annotations
  9
 10from dataclasses import dataclass
 11from typing import TYPE_CHECKING
 12
 13from airbyte import exceptions as exc
 14from airbyte._util.api_util import (
 15    CLOUD_API_ROOT,
 16    create_connection,
 17    create_destination,
 18    create_source,
 19    delete_connection,
 20    delete_destination,
 21    delete_source,
 22    get_workspace,
 23)
 24from airbyte.cloud._destination_util import get_destination_config_from_cache
 25from airbyte.cloud.connections import CloudConnection
 26from airbyte.cloud.sync_results import SyncResult
 27from airbyte.sources.base import Source
 28
 29
 30if TYPE_CHECKING:
 31    from airbyte._util.api_imports import DestinationResponse
 32    from airbyte.caches.base import CacheBase
 33
 34
 35@dataclass
 36class CloudWorkspace:
 37    """A remote workspace on the Airbyte Cloud.
 38
 39    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 40    instances, both OSS and Enterprise.
 41    """
 42
 43    workspace_id: str
 44    api_key: str
 45    api_root: str = CLOUD_API_ROOT
 46
 47    @property
 48    def workspace_url(self) -> str | None:
 49        """The URL of the workspace."""
 50        return f"{self.api_root}/workspaces/{self.workspace_id}"
 51
 52    # Test connection and creds
 53
 54    def connect(self) -> None:
 55        """Check that the workspace is reachable and raise an exception otherwise.
 56
 57        Note: It is not necessary to call this method before calling other operations. It
 58              serves primarily as a simple check to ensure that the workspace is reachable
 59              and credentials are correct.
 60        """
 61        _ = get_workspace(
 62            api_root=self.api_root,
 63            api_key=self.api_key,
 64            workspace_id=self.workspace_id,
 65        )
 66        print(f"Successfully connected to workspace: {self.workspace_url}")
 67
 68    # Deploy and delete sources
 69
 70    # TODO: Make this a public API
 71    # https://github.com/airbytehq/pyairbyte/issues/228
 72    def _deploy_source(
 73        self,
 74        source: Source,
 75    ) -> str:
 76        """Deploy a source to the workspace.
 77
 78        Returns the newly deployed source ID.
 79        """
 80        source_configuration = source.get_config().copy()
 81        source_configuration["sourceType"] = source.name.replace("source-", "")
 82
 83        deployed_source = create_source(
 84            name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)",
 85            api_root=self.api_root,
 86            api_key=self.api_key,
 87            workspace_id=self.workspace_id,
 88            config=source_configuration,
 89        )
 90
 91        # Set the deployment Ids on the source object
 92        source._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
 93        source._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
 94        source._deployed_source_id = deployed_source.source_id  # noqa: SLF001  # Accessing nn-public API
 95
 96        return deployed_source.source_id
 97
 98    def _permanently_delete_source(
 99        self,
100        source: str | Source,
101    ) -> None:
102        """Delete a source from the workspace.
103
104        You can pass either the source ID `str` or a deployed `Source` object.
105        """
106        if not isinstance(source, str | Source):
107            raise ValueError(f"Invalid source type: {type(source)}")  # noqa: TRY004, TRY003
108
109        if isinstance(source, Source):
110            if not source._deployed_source_id:  # noqa: SLF001
111                raise ValueError("Source has not been deployed.")  # noqa: TRY003
112
113            source_id = source._deployed_source_id  # noqa: SLF001
114
115        elif isinstance(source, str):
116            source_id = source
117
118        delete_source(
119            source_id=source_id,
120            api_root=self.api_root,
121            api_key=self.api_key,
122        )
123
124    # Deploy and delete destinations
125
126    # TODO: Make this a public API
127    # https://github.com/airbytehq/pyairbyte/issues/228
128    def _deploy_cache_as_destination(
129        self,
130        cache: CacheBase,
131    ) -> str:
132        """Deploy a cache to the workspace as a new destination.
133
134        Returns the newly deployed destination ID.
135        """
136        cache_type_name = cache.__class__.__name__.replace("Cache", "")
137
138        deployed_destination: DestinationResponse = create_destination(
139            name=f"Destination {cache_type_name} (Deployed by PyAirbyte)",
140            api_root=self.api_root,
141            api_key=self.api_key,
142            workspace_id=self.workspace_id,
143            config=get_destination_config_from_cache(cache),
144        )
145
146        # Set the deployment Ids on the source object
147        cache._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
148        cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
149        cache._deployed_destination_id = deployed_destination.destination_id  # noqa: SLF001  # Accessing nn-public API
150
151        return deployed_destination.destination_id
152
153    def _permanently_delete_destination(
154        self,
155        *,
156        destination: str | None = None,
157        cache: CacheBase | None = None,
158    ) -> None:
159        """Delete a deployed destination from the workspace.
160
161        You can pass either the `Cache` class or the deployed destination ID as a `str`.
162        """
163        if destination is None and cache is None:
164            raise ValueError("You must provide either a destination ID or a cache object.")  # noqa: TRY003
165        if destination is not None and cache is not None:
166            raise ValueError(  # noqa: TRY003
167                "You must provide either a destination ID or a cache object, not both."
168            )
169
170        if cache:
171            if not cache._deployed_destination_id:  # noqa: SLF001
172                raise ValueError("Cache has not been deployed.")  # noqa: TRY003
173
174            destination = cache._deployed_destination_id  # noqa: SLF001
175
176        if destination is None:
177            raise ValueError("No destination ID provided.")  # noqa: TRY003
178
179        delete_destination(
180            destination_id=destination,
181            api_root=self.api_root,
182            api_key=self.api_key,
183        )
184
185    # Deploy and delete connections
186
187    # TODO: Make this a public API
188    # https://github.com/airbytehq/pyairbyte/issues/228
189    def _deploy_connection(
190        self,
191        source: Source | str,
192        cache: CacheBase | None = None,
193        destination: str | None = None,
194        table_prefix: str | None = None,
195        selected_streams: list[str] | None = None,
196    ) -> CloudConnection:
197        """Deploy a source and cache to the workspace as a new connection.
198
199        Returns the newly deployed connection ID as a `str`.
200
201        Args:
202            source (Source | str): The source to deploy. You can pass either an already deployed
203                source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object,
204                it will be deployed automatically.
205            cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
206                `cache` or `destination`, but not both.
207            destination (str, optional): The destination ID to use. You can provide
208                `cache` or `destination`, but not both.
209            table_prefix (str, optional): The table prefix to use for the cache. If not provided,
210                the cache's table prefix will be used.
211            selected_streams (list[str], optional): The selected stream names to use for the
212                connection. If not provided, the source's selected streams will be used.
213        """
214        # Resolve source ID
215        source_id: str
216        if isinstance(source, Source):
217            selected_streams = selected_streams or source.get_selected_streams()
218            source_id = (
219                source._deployed_source_id  # noqa: SLF001  # Access to non-public API
220                or self._deploy_source(source)
221            )
222        else:
223            source_id = source
224            if not selected_streams:
225                raise exc.PyAirbyteInputError(
226                    guidance="You must provide `selected_streams` when deploying a source ID."
227                )
228
229        # Resolve destination ID
230        destination_id: str
231        if destination:
232            destination_id = destination
233        elif cache:
234            table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "")
235            if not cache._deployed_destination_id:  # noqa: SLF001
236                destination_id = self._deploy_cache_as_destination(cache)
237            else:
238                destination_id = cache._deployed_destination_id  # noqa: SLF001
239        else:
240            raise exc.PyAirbyteInputError(
241                guidance="You must provide either a destination ID or a cache object."
242            )
243
244        assert source_id is not None
245        assert destination_id is not None
246
247        deployed_connection = create_connection(
248            name="Connection (Deployed by PyAirbyte)",
249            source_id=source_id,
250            destination_id=destination_id,
251            api_root=self.api_root,
252            api_key=self.api_key,
253            workspace_id=self.workspace_id,
254            selected_stream_names=selected_streams,
255            prefix=table_prefix or "",
256        )
257
258        if isinstance(source, Source):
259            source._deployed_api_root = self.api_root  # noqa: SLF001
260            source._deployed_workspace_id = self.workspace_id  # noqa: SLF001
261            source._deployed_source_id = source_id  # noqa: SLF001
262        if cache:
263            cache._deployed_api_root = self.api_root  # noqa: SLF001
264            cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001
265            cache._deployed_destination_id = deployed_connection.destination_id  # noqa: SLF001
266
267        return CloudConnection(
268            workspace=self,
269            connection_id=deployed_connection.connection_id,
270            source=deployed_connection.source_id,
271            destination=deployed_connection.destination_id,
272        )
273
274    def get_connection(
275        self,
276        connection_id: str,
277    ) -> CloudConnection:
278        """Get a connection by ID.
279
280        This method does not fetch data from the API. It returns a `CloudConnection` object,
281        which will be loaded lazily as needed.
282        """
283        return CloudConnection(
284            workspace=self,
285            connection_id=connection_id,
286        )
287
288    def _permanently_delete_connection(
289        self,
290        connection: str | CloudConnection,
291        *,
292        delete_source: bool = False,
293        delete_destination: bool = False,
294    ) -> None:
295        """Delete a deployed connection from the workspace."""
296        if connection is None:
297            raise ValueError("No connection ID provided.")  # noqa: TRY003
298
299        if isinstance(connection, str):
300            connection = CloudConnection(
301                workspace=self,
302                connection_id=connection,
303            )
304
305        delete_connection(
306            connection_id=connection.connection_id,
307            api_root=self.api_root,
308            api_key=self.api_key,
309            workspace_id=self.workspace_id,
310        )
311        if delete_source:
312            self._permanently_delete_source(source=connection.source_id)
313
314        if delete_destination:
315            self._permanently_delete_destination(destination=connection.destination_id)
316
317    # Run syncs
318
319    def run_sync(
320        self,
321        connection_id: str,
322        *,
323        wait: bool = True,
324        wait_timeout: int = 300,
325    ) -> SyncResult:
326        """Run a sync on a deployed connection."""
327        connection = CloudConnection(
328            workspace=self,
329            connection_id=connection_id,
330        )
331        return connection.run_sync(wait=wait, wait_timeout=wait_timeout)
332
333    # Get sync results and previous sync logs
334
335    def get_sync_result(
336        self,
337        connection_id: str,
338        job_id: str | None = None,
339    ) -> SyncResult | None:
340        """Get the sync result for a connection job.
341
342        If `job_id` is not provided, the most recent sync job will be used.
343
344        Returns `None` if job_id is omitted and no previous jobs are found.
345        """
346        connection = CloudConnection(
347            workspace=self,
348            connection_id=connection_id,
349        )
350        if job_id is None:
351            results = self.get_previous_sync_logs(
352                connection_id=connection_id,
353                limit=1,
354            )
355            if results:
356                return results[0]
357
358            return None
359        connection = CloudConnection(
360            workspace=self,
361            connection_id=connection_id,
362        )
363        return SyncResult(
364            workspace=self,
365            connection=connection,
366            job_id=job_id,
367        )
368
369    def get_previous_sync_logs(
370        self,
371        connection_id: str,
372        *,
373        limit: int = 10,
374    ) -> list[SyncResult]:
375        """Get the previous sync logs for a connection."""
376        connection = CloudConnection(
377            workspace=self,
378            connection_id=connection_id,
379        )
380        return connection.get_previous_sync_logs(
381            limit=limit,
382        )
@dataclass
class CloudWorkspace:
 36@dataclass
 37class CloudWorkspace:
 38    """A remote workspace on the Airbyte Cloud.
 39
 40    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 41    instances, both OSS and Enterprise.
 42    """
 43
 44    workspace_id: str
 45    api_key: str
 46    api_root: str = CLOUD_API_ROOT
 47
 48    @property
 49    def workspace_url(self) -> str | None:
 50        """The URL of the workspace."""
 51        return f"{self.api_root}/workspaces/{self.workspace_id}"
 52
 53    # Test connection and creds
 54
 55    def connect(self) -> None:
 56        """Check that the workspace is reachable and raise an exception otherwise.
 57
 58        Note: It is not necessary to call this method before calling other operations. It
 59              serves primarily as a simple check to ensure that the workspace is reachable
 60              and credentials are correct.
 61        """
 62        _ = get_workspace(
 63            api_root=self.api_root,
 64            api_key=self.api_key,
 65            workspace_id=self.workspace_id,
 66        )
 67        print(f"Successfully connected to workspace: {self.workspace_url}")
 68
 69    # Deploy and delete sources
 70
 71    # TODO: Make this a public API
 72    # https://github.com/airbytehq/pyairbyte/issues/228
 73    def _deploy_source(
 74        self,
 75        source: Source,
 76    ) -> str:
 77        """Deploy a source to the workspace.
 78
 79        Returns the newly deployed source ID.
 80        """
 81        source_configuration = source.get_config().copy()
 82        source_configuration["sourceType"] = source.name.replace("source-", "")
 83
 84        deployed_source = create_source(
 85            name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)",
 86            api_root=self.api_root,
 87            api_key=self.api_key,
 88            workspace_id=self.workspace_id,
 89            config=source_configuration,
 90        )
 91
 92        # Set the deployment Ids on the source object
 93        source._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
 94        source._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
 95        source._deployed_source_id = deployed_source.source_id  # noqa: SLF001  # Accessing nn-public API
 96
 97        return deployed_source.source_id
 98
 99    def _permanently_delete_source(
100        self,
101        source: str | Source,
102    ) -> None:
103        """Delete a source from the workspace.
104
105        You can pass either the source ID `str` or a deployed `Source` object.
106        """
107        if not isinstance(source, str | Source):
108            raise ValueError(f"Invalid source type: {type(source)}")  # noqa: TRY004, TRY003
109
110        if isinstance(source, Source):
111            if not source._deployed_source_id:  # noqa: SLF001
112                raise ValueError("Source has not been deployed.")  # noqa: TRY003
113
114            source_id = source._deployed_source_id  # noqa: SLF001
115
116        elif isinstance(source, str):
117            source_id = source
118
119        delete_source(
120            source_id=source_id,
121            api_root=self.api_root,
122            api_key=self.api_key,
123        )
124
125    # Deploy and delete destinations
126
127    # TODO: Make this a public API
128    # https://github.com/airbytehq/pyairbyte/issues/228
129    def _deploy_cache_as_destination(
130        self,
131        cache: CacheBase,
132    ) -> str:
133        """Deploy a cache to the workspace as a new destination.
134
135        Returns the newly deployed destination ID.
136        """
137        cache_type_name = cache.__class__.__name__.replace("Cache", "")
138
139        deployed_destination: DestinationResponse = create_destination(
140            name=f"Destination {cache_type_name} (Deployed by PyAirbyte)",
141            api_root=self.api_root,
142            api_key=self.api_key,
143            workspace_id=self.workspace_id,
144            config=get_destination_config_from_cache(cache),
145        )
146
147        # Set the deployment Ids on the source object
148        cache._deployed_api_root = self.api_root  # noqa: SLF001  # Accessing nn-public API
149        cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001  # Accessing nn-public API
150        cache._deployed_destination_id = deployed_destination.destination_id  # noqa: SLF001  # Accessing nn-public API
151
152        return deployed_destination.destination_id
153
154    def _permanently_delete_destination(
155        self,
156        *,
157        destination: str | None = None,
158        cache: CacheBase | None = None,
159    ) -> None:
160        """Delete a deployed destination from the workspace.
161
162        You can pass either the `Cache` class or the deployed destination ID as a `str`.
163        """
164        if destination is None and cache is None:
165            raise ValueError("You must provide either a destination ID or a cache object.")  # noqa: TRY003
166        if destination is not None and cache is not None:
167            raise ValueError(  # noqa: TRY003
168                "You must provide either a destination ID or a cache object, not both."
169            )
170
171        if cache:
172            if not cache._deployed_destination_id:  # noqa: SLF001
173                raise ValueError("Cache has not been deployed.")  # noqa: TRY003
174
175            destination = cache._deployed_destination_id  # noqa: SLF001
176
177        if destination is None:
178            raise ValueError("No destination ID provided.")  # noqa: TRY003
179
180        delete_destination(
181            destination_id=destination,
182            api_root=self.api_root,
183            api_key=self.api_key,
184        )
185
186    # Deploy and delete connections
187
188    # TODO: Make this a public API
189    # https://github.com/airbytehq/pyairbyte/issues/228
190    def _deploy_connection(
191        self,
192        source: Source | str,
193        cache: CacheBase | None = None,
194        destination: str | None = None,
195        table_prefix: str | None = None,
196        selected_streams: list[str] | None = None,
197    ) -> CloudConnection:
198        """Deploy a source and cache to the workspace as a new connection.
199
200        Returns the newly deployed connection ID as a `str`.
201
202        Args:
203            source (Source | str): The source to deploy. You can pass either an already deployed
204                source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object,
205                it will be deployed automatically.
206            cache (CacheBase, optional): The cache to deploy as a new destination. You can provide
207                `cache` or `destination`, but not both.
208            destination (str, optional): The destination ID to use. You can provide
209                `cache` or `destination`, but not both.
210            table_prefix (str, optional): The table prefix to use for the cache. If not provided,
211                the cache's table prefix will be used.
212            selected_streams (list[str], optional): The selected stream names to use for the
213                connection. If not provided, the source's selected streams will be used.
214        """
215        # Resolve source ID
216        source_id: str
217        if isinstance(source, Source):
218            selected_streams = selected_streams or source.get_selected_streams()
219            source_id = (
220                source._deployed_source_id  # noqa: SLF001  # Access to non-public API
221                or self._deploy_source(source)
222            )
223        else:
224            source_id = source
225            if not selected_streams:
226                raise exc.PyAirbyteInputError(
227                    guidance="You must provide `selected_streams` when deploying a source ID."
228                )
229
230        # Resolve destination ID
231        destination_id: str
232        if destination:
233            destination_id = destination
234        elif cache:
235            table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "")
236            if not cache._deployed_destination_id:  # noqa: SLF001
237                destination_id = self._deploy_cache_as_destination(cache)
238            else:
239                destination_id = cache._deployed_destination_id  # noqa: SLF001
240        else:
241            raise exc.PyAirbyteInputError(
242                guidance="You must provide either a destination ID or a cache object."
243            )
244
245        assert source_id is not None
246        assert destination_id is not None
247
248        deployed_connection = create_connection(
249            name="Connection (Deployed by PyAirbyte)",
250            source_id=source_id,
251            destination_id=destination_id,
252            api_root=self.api_root,
253            api_key=self.api_key,
254            workspace_id=self.workspace_id,
255            selected_stream_names=selected_streams,
256            prefix=table_prefix or "",
257        )
258
259        if isinstance(source, Source):
260            source._deployed_api_root = self.api_root  # noqa: SLF001
261            source._deployed_workspace_id = self.workspace_id  # noqa: SLF001
262            source._deployed_source_id = source_id  # noqa: SLF001
263        if cache:
264            cache._deployed_api_root = self.api_root  # noqa: SLF001
265            cache._deployed_workspace_id = self.workspace_id  # noqa: SLF001
266            cache._deployed_destination_id = deployed_connection.destination_id  # noqa: SLF001
267
268        return CloudConnection(
269            workspace=self,
270            connection_id=deployed_connection.connection_id,
271            source=deployed_connection.source_id,
272            destination=deployed_connection.destination_id,
273        )
274
275    def get_connection(
276        self,
277        connection_id: str,
278    ) -> CloudConnection:
279        """Get a connection by ID.
280
281        This method does not fetch data from the API. It returns a `CloudConnection` object,
282        which will be loaded lazily as needed.
283        """
284        return CloudConnection(
285            workspace=self,
286            connection_id=connection_id,
287        )
288
289    def _permanently_delete_connection(
290        self,
291        connection: str | CloudConnection,
292        *,
293        delete_source: bool = False,
294        delete_destination: bool = False,
295    ) -> None:
296        """Delete a deployed connection from the workspace."""
297        if connection is None:
298            raise ValueError("No connection ID provided.")  # noqa: TRY003
299
300        if isinstance(connection, str):
301            connection = CloudConnection(
302                workspace=self,
303                connection_id=connection,
304            )
305
306        delete_connection(
307            connection_id=connection.connection_id,
308            api_root=self.api_root,
309            api_key=self.api_key,
310            workspace_id=self.workspace_id,
311        )
312        if delete_source:
313            self._permanently_delete_source(source=connection.source_id)
314
315        if delete_destination:
316            self._permanently_delete_destination(destination=connection.destination_id)
317
318    # Run syncs
319
320    def run_sync(
321        self,
322        connection_id: str,
323        *,
324        wait: bool = True,
325        wait_timeout: int = 300,
326    ) -> SyncResult:
327        """Run a sync on a deployed connection."""
328        connection = CloudConnection(
329            workspace=self,
330            connection_id=connection_id,
331        )
332        return connection.run_sync(wait=wait, wait_timeout=wait_timeout)
333
334    # Get sync results and previous sync logs
335
336    def get_sync_result(
337        self,
338        connection_id: str,
339        job_id: str | None = None,
340    ) -> SyncResult | None:
341        """Get the sync result for a connection job.
342
343        If `job_id` is not provided, the most recent sync job will be used.
344
345        Returns `None` if job_id is omitted and no previous jobs are found.
346        """
347        connection = CloudConnection(
348            workspace=self,
349            connection_id=connection_id,
350        )
351        if job_id is None:
352            results = self.get_previous_sync_logs(
353                connection_id=connection_id,
354                limit=1,
355            )
356            if results:
357                return results[0]
358
359            return None
360        connection = CloudConnection(
361            workspace=self,
362            connection_id=connection_id,
363        )
364        return SyncResult(
365            workspace=self,
366            connection=connection,
367            job_id=job_id,
368        )
369
370    def get_previous_sync_logs(
371        self,
372        connection_id: str,
373        *,
374        limit: int = 10,
375    ) -> list[SyncResult]:
376        """Get the previous sync logs for a connection."""
377        connection = CloudConnection(
378            workspace=self,
379            connection_id=connection_id,
380        )
381        return connection.get_previous_sync_logs(
382            limit=limit,
383        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

CloudWorkspace( workspace_id: str, api_key: str, api_root: str = 'https://api.airbyte.com/v1')
workspace_id: str
api_key: str
api_root: str = 'https://api.airbyte.com/v1'
workspace_url: str | None
48    @property
49    def workspace_url(self) -> str | None:
50        """The URL of the workspace."""
51        return f"{self.api_root}/workspaces/{self.workspace_id}"

The URL of the workspace.

def connect(self) -> None:
55    def connect(self) -> None:
56        """Check that the workspace is reachable and raise an exception otherwise.
57
58        Note: It is not necessary to call this method before calling other operations. It
59              serves primarily as a simple check to ensure that the workspace is reachable
60              and credentials are correct.
61        """
62        _ = get_workspace(
63            api_root=self.api_root,
64            api_key=self.api_key,
65            workspace_id=self.workspace_id,
66        )
67        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> airbyte.cloud.CloudConnection:
275    def get_connection(
276        self,
277        connection_id: str,
278    ) -> CloudConnection:
279        """Get a connection by ID.
280
281        This method does not fetch data from the API. It returns a `CloudConnection` object,
282        which will be loaded lazily as needed.
283        """
284        return CloudConnection(
285            workspace=self,
286            connection_id=connection_id,
287        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def run_sync( self, connection_id: str, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.SyncResult:
320    def run_sync(
321        self,
322        connection_id: str,
323        *,
324        wait: bool = True,
325        wait_timeout: int = 300,
326    ) -> SyncResult:
327        """Run a sync on a deployed connection."""
328        connection = CloudConnection(
329            workspace=self,
330            connection_id=connection_id,
331        )
332        return connection.run_sync(wait=wait, wait_timeout=wait_timeout)

Run a sync on a deployed connection.

def get_sync_result( self, connection_id: str, job_id: str | None = None) -> airbyte.cloud.SyncResult | None:
336    def get_sync_result(
337        self,
338        connection_id: str,
339        job_id: str | None = None,
340    ) -> SyncResult | None:
341        """Get the sync result for a connection job.
342
343        If `job_id` is not provided, the most recent sync job will be used.
344
345        Returns `None` if job_id is omitted and no previous jobs are found.
346        """
347        connection = CloudConnection(
348            workspace=self,
349            connection_id=connection_id,
350        )
351        if job_id is None:
352            results = self.get_previous_sync_logs(
353                connection_id=connection_id,
354                limit=1,
355            )
356            if results:
357                return results[0]
358
359            return None
360        connection = CloudConnection(
361            workspace=self,
362            connection_id=connection_id,
363        )
364        return SyncResult(
365            workspace=self,
366            connection=connection,
367            job_id=job_id,
368        )

Get the sync result for a connection job.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def get_previous_sync_logs( self, connection_id: str, *, limit: int = 10) -> list[airbyte.cloud.SyncResult]:
370    def get_previous_sync_logs(
371        self,
372        connection_id: str,
373        *,
374        limit: int = 10,
375    ) -> list[SyncResult]:
376        """Get the previous sync logs for a connection."""
377        connection = CloudConnection(
378            workspace=self,
379            connection_id=connection_id,
380        )
381        return connection.get_previous_sync_logs(
382            limit=limit,
383        )

Get the previous sync logs for a connection.