airbyte.cloud.workspaces
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
By overriding api_root
, you can use this module to interact with self-managed Airbyte instances,
both OSS and Enterprise.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances, 5both OSS and Enterprise. 6""" 7 8from __future__ import annotations 9 10from dataclasses import dataclass 11from typing import TYPE_CHECKING 12 13from airbyte import exceptions as exc 14from airbyte._util.api_util import ( 15 CLOUD_API_ROOT, 16 create_connection, 17 create_destination, 18 create_source, 19 delete_connection, 20 delete_destination, 21 delete_source, 22 get_workspace, 23) 24from airbyte.cloud._destination_util import get_destination_config_from_cache 25from airbyte.cloud.connections import CloudConnection 26from airbyte.cloud.sync_results import SyncResult 27from airbyte.sources.base import Source 28 29 30if TYPE_CHECKING: 31 from airbyte._util.api_imports import DestinationResponse 32 from airbyte.caches.base import CacheBase 33 34 35@dataclass 36class CloudWorkspace: 37 """A remote workspace on the Airbyte Cloud. 38 39 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 40 instances, both OSS and Enterprise. 41 """ 42 43 workspace_id: str 44 api_key: str 45 api_root: str = CLOUD_API_ROOT 46 47 @property 48 def workspace_url(self) -> str | None: 49 """The URL of the workspace.""" 50 return f"{self.api_root}/workspaces/{self.workspace_id}" 51 52 # Test connection and creds 53 54 def connect(self) -> None: 55 """Check that the workspace is reachable and raise an exception otherwise. 56 57 Note: It is not necessary to call this method before calling other operations. It 58 serves primarily as a simple check to ensure that the workspace is reachable 59 and credentials are correct. 60 """ 61 _ = get_workspace( 62 api_root=self.api_root, 63 api_key=self.api_key, 64 workspace_id=self.workspace_id, 65 ) 66 print(f"Successfully connected to workspace: {self.workspace_url}") 67 68 # Deploy and delete sources 69 70 # TODO: Make this a public API 71 # https://github.com/airbytehq/pyairbyte/issues/228 72 def _deploy_source( 73 self, 74 source: Source, 75 ) -> str: 76 """Deploy a source to the workspace. 77 78 Returns the newly deployed source ID. 79 """ 80 source_configuration = source.get_config().copy() 81 source_configuration["sourceType"] = source.name.replace("source-", "") 82 83 deployed_source = create_source( 84 name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)", 85 api_root=self.api_root, 86 api_key=self.api_key, 87 workspace_id=self.workspace_id, 88 config=source_configuration, 89 ) 90 91 # Set the deployment Ids on the source object 92 source._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 93 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 94 source._deployed_source_id = deployed_source.source_id # noqa: SLF001 # Accessing nn-public API 95 96 return deployed_source.source_id 97 98 def _permanently_delete_source( 99 self, 100 source: str | Source, 101 ) -> None: 102 """Delete a source from the workspace. 103 104 You can pass either the source ID `str` or a deployed `Source` object. 105 """ 106 if not isinstance(source, str | Source): 107 raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 108 109 if isinstance(source, Source): 110 if not source._deployed_source_id: # noqa: SLF001 111 raise ValueError("Source has not been deployed.") # noqa: TRY003 112 113 source_id = source._deployed_source_id # noqa: SLF001 114 115 elif isinstance(source, str): 116 source_id = source 117 118 delete_source( 119 source_id=source_id, 120 api_root=self.api_root, 121 api_key=self.api_key, 122 ) 123 124 # Deploy and delete destinations 125 126 # TODO: Make this a public API 127 # https://github.com/airbytehq/pyairbyte/issues/228 128 def _deploy_cache_as_destination( 129 self, 130 cache: CacheBase, 131 ) -> str: 132 """Deploy a cache to the workspace as a new destination. 133 134 Returns the newly deployed destination ID. 135 """ 136 cache_type_name = cache.__class__.__name__.replace("Cache", "") 137 138 deployed_destination: DestinationResponse = create_destination( 139 name=f"Destination {cache_type_name} (Deployed by PyAirbyte)", 140 api_root=self.api_root, 141 api_key=self.api_key, 142 workspace_id=self.workspace_id, 143 config=get_destination_config_from_cache(cache), 144 ) 145 146 # Set the deployment Ids on the source object 147 cache._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 148 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 149 cache._deployed_destination_id = deployed_destination.destination_id # noqa: SLF001 # Accessing nn-public API 150 151 return deployed_destination.destination_id 152 153 def _permanently_delete_destination( 154 self, 155 *, 156 destination: str | None = None, 157 cache: CacheBase | None = None, 158 ) -> None: 159 """Delete a deployed destination from the workspace. 160 161 You can pass either the `Cache` class or the deployed destination ID as a `str`. 162 """ 163 if destination is None and cache is None: 164 raise ValueError("You must provide either a destination ID or a cache object.") # noqa: TRY003 165 if destination is not None and cache is not None: 166 raise ValueError( # noqa: TRY003 167 "You must provide either a destination ID or a cache object, not both." 168 ) 169 170 if cache: 171 if not cache._deployed_destination_id: # noqa: SLF001 172 raise ValueError("Cache has not been deployed.") # noqa: TRY003 173 174 destination = cache._deployed_destination_id # noqa: SLF001 175 176 if destination is None: 177 raise ValueError("No destination ID provided.") # noqa: TRY003 178 179 delete_destination( 180 destination_id=destination, 181 api_root=self.api_root, 182 api_key=self.api_key, 183 ) 184 185 # Deploy and delete connections 186 187 # TODO: Make this a public API 188 # https://github.com/airbytehq/pyairbyte/issues/228 189 def _deploy_connection( 190 self, 191 source: Source | str, 192 cache: CacheBase | None = None, 193 destination: str | None = None, 194 table_prefix: str | None = None, 195 selected_streams: list[str] | None = None, 196 ) -> CloudConnection: 197 """Deploy a source and cache to the workspace as a new connection. 198 199 Returns the newly deployed connection ID as a `str`. 200 201 Args: 202 source (Source | str): The source to deploy. You can pass either an already deployed 203 source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object, 204 it will be deployed automatically. 205 cache (CacheBase, optional): The cache to deploy as a new destination. You can provide 206 `cache` or `destination`, but not both. 207 destination (str, optional): The destination ID to use. You can provide 208 `cache` or `destination`, but not both. 209 table_prefix (str, optional): The table prefix to use for the cache. If not provided, 210 the cache's table prefix will be used. 211 selected_streams (list[str], optional): The selected stream names to use for the 212 connection. If not provided, the source's selected streams will be used. 213 """ 214 # Resolve source ID 215 source_id: str 216 if isinstance(source, Source): 217 selected_streams = selected_streams or source.get_selected_streams() 218 source_id = ( 219 source._deployed_source_id # noqa: SLF001 # Access to non-public API 220 or self._deploy_source(source) 221 ) 222 else: 223 source_id = source 224 if not selected_streams: 225 raise exc.PyAirbyteInputError( 226 guidance="You must provide `selected_streams` when deploying a source ID." 227 ) 228 229 # Resolve destination ID 230 destination_id: str 231 if destination: 232 destination_id = destination 233 elif cache: 234 table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "") 235 if not cache._deployed_destination_id: # noqa: SLF001 236 destination_id = self._deploy_cache_as_destination(cache) 237 else: 238 destination_id = cache._deployed_destination_id # noqa: SLF001 239 else: 240 raise exc.PyAirbyteInputError( 241 guidance="You must provide either a destination ID or a cache object." 242 ) 243 244 assert source_id is not None 245 assert destination_id is not None 246 247 deployed_connection = create_connection( 248 name="Connection (Deployed by PyAirbyte)", 249 source_id=source_id, 250 destination_id=destination_id, 251 api_root=self.api_root, 252 api_key=self.api_key, 253 workspace_id=self.workspace_id, 254 selected_stream_names=selected_streams, 255 prefix=table_prefix or "", 256 ) 257 258 if isinstance(source, Source): 259 source._deployed_api_root = self.api_root # noqa: SLF001 260 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 261 source._deployed_source_id = source_id # noqa: SLF001 262 if cache: 263 cache._deployed_api_root = self.api_root # noqa: SLF001 264 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 265 cache._deployed_destination_id = deployed_connection.destination_id # noqa: SLF001 266 267 return CloudConnection( 268 workspace=self, 269 connection_id=deployed_connection.connection_id, 270 source=deployed_connection.source_id, 271 destination=deployed_connection.destination_id, 272 ) 273 274 def get_connection( 275 self, 276 connection_id: str, 277 ) -> CloudConnection: 278 """Get a connection by ID. 279 280 This method does not fetch data from the API. It returns a `CloudConnection` object, 281 which will be loaded lazily as needed. 282 """ 283 return CloudConnection( 284 workspace=self, 285 connection_id=connection_id, 286 ) 287 288 def _permanently_delete_connection( 289 self, 290 connection: str | CloudConnection, 291 *, 292 delete_source: bool = False, 293 delete_destination: bool = False, 294 ) -> None: 295 """Delete a deployed connection from the workspace.""" 296 if connection is None: 297 raise ValueError("No connection ID provided.") # noqa: TRY003 298 299 if isinstance(connection, str): 300 connection = CloudConnection( 301 workspace=self, 302 connection_id=connection, 303 ) 304 305 delete_connection( 306 connection_id=connection.connection_id, 307 api_root=self.api_root, 308 api_key=self.api_key, 309 workspace_id=self.workspace_id, 310 ) 311 if delete_source: 312 self._permanently_delete_source(source=connection.source_id) 313 314 if delete_destination: 315 self._permanently_delete_destination(destination=connection.destination_id) 316 317 # Run syncs 318 319 def run_sync( 320 self, 321 connection_id: str, 322 *, 323 wait: bool = True, 324 wait_timeout: int = 300, 325 ) -> SyncResult: 326 """Run a sync on a deployed connection.""" 327 connection = CloudConnection( 328 workspace=self, 329 connection_id=connection_id, 330 ) 331 return connection.run_sync(wait=wait, wait_timeout=wait_timeout) 332 333 # Get sync results and previous sync logs 334 335 def get_sync_result( 336 self, 337 connection_id: str, 338 job_id: str | None = None, 339 ) -> SyncResult | None: 340 """Get the sync result for a connection job. 341 342 If `job_id` is not provided, the most recent sync job will be used. 343 344 Returns `None` if job_id is omitted and no previous jobs are found. 345 """ 346 connection = CloudConnection( 347 workspace=self, 348 connection_id=connection_id, 349 ) 350 if job_id is None: 351 results = self.get_previous_sync_logs( 352 connection_id=connection_id, 353 limit=1, 354 ) 355 if results: 356 return results[0] 357 358 return None 359 connection = CloudConnection( 360 workspace=self, 361 connection_id=connection_id, 362 ) 363 return SyncResult( 364 workspace=self, 365 connection=connection, 366 job_id=job_id, 367 ) 368 369 def get_previous_sync_logs( 370 self, 371 connection_id: str, 372 *, 373 limit: int = 10, 374 ) -> list[SyncResult]: 375 """Get the previous sync logs for a connection.""" 376 connection = CloudConnection( 377 workspace=self, 378 connection_id=connection_id, 379 ) 380 return connection.get_previous_sync_logs( 381 limit=limit, 382 )
36@dataclass 37class CloudWorkspace: 38 """A remote workspace on the Airbyte Cloud. 39 40 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 41 instances, both OSS and Enterprise. 42 """ 43 44 workspace_id: str 45 api_key: str 46 api_root: str = CLOUD_API_ROOT 47 48 @property 49 def workspace_url(self) -> str | None: 50 """The URL of the workspace.""" 51 return f"{self.api_root}/workspaces/{self.workspace_id}" 52 53 # Test connection and creds 54 55 def connect(self) -> None: 56 """Check that the workspace is reachable and raise an exception otherwise. 57 58 Note: It is not necessary to call this method before calling other operations. It 59 serves primarily as a simple check to ensure that the workspace is reachable 60 and credentials are correct. 61 """ 62 _ = get_workspace( 63 api_root=self.api_root, 64 api_key=self.api_key, 65 workspace_id=self.workspace_id, 66 ) 67 print(f"Successfully connected to workspace: {self.workspace_url}") 68 69 # Deploy and delete sources 70 71 # TODO: Make this a public API 72 # https://github.com/airbytehq/pyairbyte/issues/228 73 def _deploy_source( 74 self, 75 source: Source, 76 ) -> str: 77 """Deploy a source to the workspace. 78 79 Returns the newly deployed source ID. 80 """ 81 source_configuration = source.get_config().copy() 82 source_configuration["sourceType"] = source.name.replace("source-", "") 83 84 deployed_source = create_source( 85 name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)", 86 api_root=self.api_root, 87 api_key=self.api_key, 88 workspace_id=self.workspace_id, 89 config=source_configuration, 90 ) 91 92 # Set the deployment Ids on the source object 93 source._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 94 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 95 source._deployed_source_id = deployed_source.source_id # noqa: SLF001 # Accessing nn-public API 96 97 return deployed_source.source_id 98 99 def _permanently_delete_source( 100 self, 101 source: str | Source, 102 ) -> None: 103 """Delete a source from the workspace. 104 105 You can pass either the source ID `str` or a deployed `Source` object. 106 """ 107 if not isinstance(source, str | Source): 108 raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 109 110 if isinstance(source, Source): 111 if not source._deployed_source_id: # noqa: SLF001 112 raise ValueError("Source has not been deployed.") # noqa: TRY003 113 114 source_id = source._deployed_source_id # noqa: SLF001 115 116 elif isinstance(source, str): 117 source_id = source 118 119 delete_source( 120 source_id=source_id, 121 api_root=self.api_root, 122 api_key=self.api_key, 123 ) 124 125 # Deploy and delete destinations 126 127 # TODO: Make this a public API 128 # https://github.com/airbytehq/pyairbyte/issues/228 129 def _deploy_cache_as_destination( 130 self, 131 cache: CacheBase, 132 ) -> str: 133 """Deploy a cache to the workspace as a new destination. 134 135 Returns the newly deployed destination ID. 136 """ 137 cache_type_name = cache.__class__.__name__.replace("Cache", "") 138 139 deployed_destination: DestinationResponse = create_destination( 140 name=f"Destination {cache_type_name} (Deployed by PyAirbyte)", 141 api_root=self.api_root, 142 api_key=self.api_key, 143 workspace_id=self.workspace_id, 144 config=get_destination_config_from_cache(cache), 145 ) 146 147 # Set the deployment Ids on the source object 148 cache._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 149 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 150 cache._deployed_destination_id = deployed_destination.destination_id # noqa: SLF001 # Accessing nn-public API 151 152 return deployed_destination.destination_id 153 154 def _permanently_delete_destination( 155 self, 156 *, 157 destination: str | None = None, 158 cache: CacheBase | None = None, 159 ) -> None: 160 """Delete a deployed destination from the workspace. 161 162 You can pass either the `Cache` class or the deployed destination ID as a `str`. 163 """ 164 if destination is None and cache is None: 165 raise ValueError("You must provide either a destination ID or a cache object.") # noqa: TRY003 166 if destination is not None and cache is not None: 167 raise ValueError( # noqa: TRY003 168 "You must provide either a destination ID or a cache object, not both." 169 ) 170 171 if cache: 172 if not cache._deployed_destination_id: # noqa: SLF001 173 raise ValueError("Cache has not been deployed.") # noqa: TRY003 174 175 destination = cache._deployed_destination_id # noqa: SLF001 176 177 if destination is None: 178 raise ValueError("No destination ID provided.") # noqa: TRY003 179 180 delete_destination( 181 destination_id=destination, 182 api_root=self.api_root, 183 api_key=self.api_key, 184 ) 185 186 # Deploy and delete connections 187 188 # TODO: Make this a public API 189 # https://github.com/airbytehq/pyairbyte/issues/228 190 def _deploy_connection( 191 self, 192 source: Source | str, 193 cache: CacheBase | None = None, 194 destination: str | None = None, 195 table_prefix: str | None = None, 196 selected_streams: list[str] | None = None, 197 ) -> CloudConnection: 198 """Deploy a source and cache to the workspace as a new connection. 199 200 Returns the newly deployed connection ID as a `str`. 201 202 Args: 203 source (Source | str): The source to deploy. You can pass either an already deployed 204 source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object, 205 it will be deployed automatically. 206 cache (CacheBase, optional): The cache to deploy as a new destination. You can provide 207 `cache` or `destination`, but not both. 208 destination (str, optional): The destination ID to use. You can provide 209 `cache` or `destination`, but not both. 210 table_prefix (str, optional): The table prefix to use for the cache. If not provided, 211 the cache's table prefix will be used. 212 selected_streams (list[str], optional): The selected stream names to use for the 213 connection. If not provided, the source's selected streams will be used. 214 """ 215 # Resolve source ID 216 source_id: str 217 if isinstance(source, Source): 218 selected_streams = selected_streams or source.get_selected_streams() 219 source_id = ( 220 source._deployed_source_id # noqa: SLF001 # Access to non-public API 221 or self._deploy_source(source) 222 ) 223 else: 224 source_id = source 225 if not selected_streams: 226 raise exc.PyAirbyteInputError( 227 guidance="You must provide `selected_streams` when deploying a source ID." 228 ) 229 230 # Resolve destination ID 231 destination_id: str 232 if destination: 233 destination_id = destination 234 elif cache: 235 table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "") 236 if not cache._deployed_destination_id: # noqa: SLF001 237 destination_id = self._deploy_cache_as_destination(cache) 238 else: 239 destination_id = cache._deployed_destination_id # noqa: SLF001 240 else: 241 raise exc.PyAirbyteInputError( 242 guidance="You must provide either a destination ID or a cache object." 243 ) 244 245 assert source_id is not None 246 assert destination_id is not None 247 248 deployed_connection = create_connection( 249 name="Connection (Deployed by PyAirbyte)", 250 source_id=source_id, 251 destination_id=destination_id, 252 api_root=self.api_root, 253 api_key=self.api_key, 254 workspace_id=self.workspace_id, 255 selected_stream_names=selected_streams, 256 prefix=table_prefix or "", 257 ) 258 259 if isinstance(source, Source): 260 source._deployed_api_root = self.api_root # noqa: SLF001 261 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 262 source._deployed_source_id = source_id # noqa: SLF001 263 if cache: 264 cache._deployed_api_root = self.api_root # noqa: SLF001 265 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 266 cache._deployed_destination_id = deployed_connection.destination_id # noqa: SLF001 267 268 return CloudConnection( 269 workspace=self, 270 connection_id=deployed_connection.connection_id, 271 source=deployed_connection.source_id, 272 destination=deployed_connection.destination_id, 273 ) 274 275 def get_connection( 276 self, 277 connection_id: str, 278 ) -> CloudConnection: 279 """Get a connection by ID. 280 281 This method does not fetch data from the API. It returns a `CloudConnection` object, 282 which will be loaded lazily as needed. 283 """ 284 return CloudConnection( 285 workspace=self, 286 connection_id=connection_id, 287 ) 288 289 def _permanently_delete_connection( 290 self, 291 connection: str | CloudConnection, 292 *, 293 delete_source: bool = False, 294 delete_destination: bool = False, 295 ) -> None: 296 """Delete a deployed connection from the workspace.""" 297 if connection is None: 298 raise ValueError("No connection ID provided.") # noqa: TRY003 299 300 if isinstance(connection, str): 301 connection = CloudConnection( 302 workspace=self, 303 connection_id=connection, 304 ) 305 306 delete_connection( 307 connection_id=connection.connection_id, 308 api_root=self.api_root, 309 api_key=self.api_key, 310 workspace_id=self.workspace_id, 311 ) 312 if delete_source: 313 self._permanently_delete_source(source=connection.source_id) 314 315 if delete_destination: 316 self._permanently_delete_destination(destination=connection.destination_id) 317 318 # Run syncs 319 320 def run_sync( 321 self, 322 connection_id: str, 323 *, 324 wait: bool = True, 325 wait_timeout: int = 300, 326 ) -> SyncResult: 327 """Run a sync on a deployed connection.""" 328 connection = CloudConnection( 329 workspace=self, 330 connection_id=connection_id, 331 ) 332 return connection.run_sync(wait=wait, wait_timeout=wait_timeout) 333 334 # Get sync results and previous sync logs 335 336 def get_sync_result( 337 self, 338 connection_id: str, 339 job_id: str | None = None, 340 ) -> SyncResult | None: 341 """Get the sync result for a connection job. 342 343 If `job_id` is not provided, the most recent sync job will be used. 344 345 Returns `None` if job_id is omitted and no previous jobs are found. 346 """ 347 connection = CloudConnection( 348 workspace=self, 349 connection_id=connection_id, 350 ) 351 if job_id is None: 352 results = self.get_previous_sync_logs( 353 connection_id=connection_id, 354 limit=1, 355 ) 356 if results: 357 return results[0] 358 359 return None 360 connection = CloudConnection( 361 workspace=self, 362 connection_id=connection_id, 363 ) 364 return SyncResult( 365 workspace=self, 366 connection=connection, 367 job_id=job_id, 368 ) 369 370 def get_previous_sync_logs( 371 self, 372 connection_id: str, 373 *, 374 limit: int = 10, 375 ) -> list[SyncResult]: 376 """Get the previous sync logs for a connection.""" 377 connection = CloudConnection( 378 workspace=self, 379 connection_id=connection_id, 380 ) 381 return connection.get_previous_sync_logs( 382 limit=limit, 383 )
A remote workspace on the Airbyte Cloud.
By overriding api_root
, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
48 @property 49 def workspace_url(self) -> str | None: 50 """The URL of the workspace.""" 51 return f"{self.api_root}/workspaces/{self.workspace_id}"
The URL of the workspace.
55 def connect(self) -> None: 56 """Check that the workspace is reachable and raise an exception otherwise. 57 58 Note: It is not necessary to call this method before calling other operations. It 59 serves primarily as a simple check to ensure that the workspace is reachable 60 and credentials are correct. 61 """ 62 _ = get_workspace( 63 api_root=self.api_root, 64 api_key=self.api_key, 65 workspace_id=self.workspace_id, 66 ) 67 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
275 def get_connection( 276 self, 277 connection_id: str, 278 ) -> CloudConnection: 279 """Get a connection by ID. 280 281 This method does not fetch data from the API. It returns a `CloudConnection` object, 282 which will be loaded lazily as needed. 283 """ 284 return CloudConnection( 285 workspace=self, 286 connection_id=connection_id, 287 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection
object,
which will be loaded lazily as needed.
320 def run_sync( 321 self, 322 connection_id: str, 323 *, 324 wait: bool = True, 325 wait_timeout: int = 300, 326 ) -> SyncResult: 327 """Run a sync on a deployed connection.""" 328 connection = CloudConnection( 329 workspace=self, 330 connection_id=connection_id, 331 ) 332 return connection.run_sync(wait=wait, wait_timeout=wait_timeout)
Run a sync on a deployed connection.
336 def get_sync_result( 337 self, 338 connection_id: str, 339 job_id: str | None = None, 340 ) -> SyncResult | None: 341 """Get the sync result for a connection job. 342 343 If `job_id` is not provided, the most recent sync job will be used. 344 345 Returns `None` if job_id is omitted and no previous jobs are found. 346 """ 347 connection = CloudConnection( 348 workspace=self, 349 connection_id=connection_id, 350 ) 351 if job_id is None: 352 results = self.get_previous_sync_logs( 353 connection_id=connection_id, 354 limit=1, 355 ) 356 if results: 357 return results[0] 358 359 return None 360 connection = CloudConnection( 361 workspace=self, 362 connection_id=connection_id, 363 ) 364 return SyncResult( 365 workspace=self, 366 connection=connection, 367 job_id=job_id, 368 )
Get the sync result for a connection job.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
370 def get_previous_sync_logs( 371 self, 372 connection_id: str, 373 *, 374 limit: int = 10, 375 ) -> list[SyncResult]: 376 """Get the previous sync logs for a connection.""" 377 connection = CloudConnection( 378 workspace=self, 379 connection_id=connection_id, 380 ) 381 return connection.get_previous_sync_logs( 382 limit=limit, 383 )
Get the previous sync logs for a connection.