airbyte.cloud.workspaces
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
By overriding api_root
, you can use this module to interact with self-managed Airbyte instances,
both OSS and Enterprise.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4By overriding `api_root`, you can use this module to interact with self-managed Airbyte instances, 5both OSS and Enterprise. 6""" 7 8from __future__ import annotations 9 10from dataclasses import dataclass 11from typing import TYPE_CHECKING 12 13from airbyte import exceptions as exc 14from airbyte._util.api_util import ( 15 CLOUD_API_ROOT, 16 create_connection, 17 create_destination, 18 create_source, 19 delete_connection, 20 delete_destination, 21 delete_source, 22 get_workspace, 23) 24from airbyte.cloud._destination_util import get_destination_config_from_cache 25from airbyte.cloud.connections import CloudConnection 26from airbyte.cloud.sync_results import SyncResult 27from airbyte.sources.base import Source 28 29 30if TYPE_CHECKING: 31 from airbyte._util.api_imports import DestinationResponse 32 from airbyte.caches.base import CacheBase 33 34 35@dataclass 36class CloudWorkspace: 37 """A remote workspace on the Airbyte Cloud. 38 39 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 40 instances, both OSS and Enterprise. 41 """ 42 43 workspace_id: str 44 api_key: str 45 api_root: str = CLOUD_API_ROOT 46 47 @property 48 def workspace_url(self) -> str | None: 49 return f"{self.api_root}/workspaces/{self.workspace_id}" 50 51 # Test connection and creds 52 53 def connect(self) -> None: 54 """Check that the workspace is reachable and raise an exception otherwise. 55 56 Note: It is not necessary to call this method before calling other operations. It 57 serves primarily as a simple check to ensure that the workspace is reachable 58 and credentials are correct. 59 """ 60 _ = get_workspace( 61 api_root=self.api_root, 62 api_key=self.api_key, 63 workspace_id=self.workspace_id, 64 ) 65 print(f"Successfully connected to workspace: {self.workspace_url}") 66 67 # Deploy and delete sources 68 69 # TODO: Make this a public API 70 def _deploy_source( 71 self, 72 source: Source, 73 ) -> str: 74 """Deploy a source to the workspace. 75 76 Returns the newly deployed source ID. 77 """ 78 source_configuration = source.get_config().copy() 79 source_configuration["sourceType"] = source.name.replace("source-", "") 80 81 deployed_source = create_source( 82 name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)", 83 api_root=self.api_root, 84 api_key=self.api_key, 85 workspace_id=self.workspace_id, 86 config=source_configuration, 87 ) 88 89 # Set the deployment Ids on the source object 90 source._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 91 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 92 source._deployed_source_id = deployed_source.source_id # noqa: SLF001 # Accessing nn-public API 93 94 return deployed_source.source_id 95 96 def _permanently_delete_source( 97 self, 98 source: str | Source, 99 ) -> None: 100 """Delete a source from the workspace. 101 102 You can pass either the source ID `str` or a deployed `Source` object. 103 """ 104 if not isinstance(source, (str, Source)): 105 raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 106 107 if isinstance(source, Source): 108 if not source._deployed_source_id: # noqa: SLF001 109 raise ValueError("Source has not been deployed.") # noqa: TRY003 110 111 source_id = source._deployed_source_id # noqa: SLF001 112 113 elif isinstance(source, str): 114 source_id = source 115 116 delete_source( 117 source_id=source_id, 118 api_root=self.api_root, 119 api_key=self.api_key, 120 ) 121 122 # Deploy and delete destinations 123 124 # TODO: Make this a public API 125 def _deploy_cache_as_destination( 126 self, 127 cache: CacheBase, 128 ) -> str: 129 """Deploy a cache to the workspace as a new destination. 130 131 Returns the newly deployed destination ID. 132 """ 133 cache_type_name = cache.__class__.__name__.replace("Cache", "") 134 135 deployed_destination: DestinationResponse = create_destination( 136 name=f"Destination {cache_type_name} (Deployed by PyAirbyte)", 137 api_root=self.api_root, 138 api_key=self.api_key, 139 workspace_id=self.workspace_id, 140 config=get_destination_config_from_cache(cache), 141 ) 142 143 # Set the deployment Ids on the source object 144 cache._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 145 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 146 cache._deployed_destination_id = deployed_destination.destination_id # noqa: SLF001 # Accessing nn-public API 147 148 return deployed_destination.destination_id 149 150 def _permanently_delete_destination( 151 self, 152 *, 153 destination: str | None = None, 154 cache: CacheBase | None = None, 155 ) -> None: 156 """Delete a deployed destination from the workspace. 157 158 You can pass either the `Cache` class or the deployed destination ID as a `str`. 159 """ 160 if destination is None and cache is None: 161 raise ValueError("You must provide either a destination ID or a cache object.") # noqa: TRY003 162 if destination is not None and cache is not None: 163 raise ValueError( # noqa: TRY003 164 "You must provide either a destination ID or a cache object, not both." 165 ) 166 167 if cache: 168 if not cache._deployed_destination_id: # noqa: SLF001 169 raise ValueError("Cache has not been deployed.") # noqa: TRY003 170 171 destination = cache._deployed_destination_id # noqa: SLF001 172 173 if destination is None: 174 raise ValueError("No destination ID provided.") # noqa: TRY003 175 176 delete_destination( 177 destination_id=destination, 178 api_root=self.api_root, 179 api_key=self.api_key, 180 ) 181 182 # Deploy and delete connections 183 184 # TODO: Make this a public API 185 def _deploy_connection( 186 self, 187 source: Source | str, 188 cache: CacheBase | None = None, 189 destination: str | None = None, 190 table_prefix: str | None = None, 191 selected_streams: list[str] | None = None, 192 ) -> CloudConnection: 193 """Deploy a source and cache to the workspace as a new connection. 194 195 Returns the newly deployed connection ID as a `str`. 196 197 Args: 198 source (Source | str): The source to deploy. You can pass either an already deployed 199 source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object, 200 it will be deployed automatically. 201 cache (CacheBase, optional): The cache to deploy as a new destination. You can provide 202 `cache` or `destination`, but not both. 203 destination (str, optional): The destination ID to use. You can provide 204 `cache` or `destination`, but not both. 205 """ 206 # Resolve source ID 207 source_id: str 208 if isinstance(source, Source): 209 selected_streams = selected_streams or source.get_selected_streams() 210 if source._deployed_source_id: # noqa: SLF001 211 source_id = source._deployed_source_id # noqa: SLF001 212 else: 213 source_id = self._deploy_source(source) 214 else: 215 source_id = source 216 if not selected_streams: 217 raise exc.PyAirbyteInputError( 218 guidance="You must provide `selected_streams` when deploying a source ID." 219 ) 220 221 # Resolve destination ID 222 destination_id: str 223 if destination: 224 destination_id = destination 225 elif cache: 226 table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "") 227 if not cache._deployed_destination_id: # noqa: SLF001 228 destination_id = self._deploy_cache_as_destination(cache) 229 else: 230 destination_id = cache._deployed_destination_id # noqa: SLF001 231 else: 232 raise exc.PyAirbyteInputError( 233 guidance="You must provide either a destination ID or a cache object." 234 ) 235 236 assert source_id is not None 237 assert destination_id is not None 238 239 deployed_connection = create_connection( 240 name="Connection (Deployed by PyAirbyte)", 241 source_id=source_id, 242 destination_id=destination_id, 243 api_root=self.api_root, 244 api_key=self.api_key, 245 workspace_id=self.workspace_id, 246 selected_stream_names=selected_streams, 247 prefix=table_prefix or "", 248 ) 249 250 if isinstance(source, Source): 251 source._deployed_api_root = self.api_root # noqa: SLF001 252 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 253 source._deployed_source_id = source_id # noqa: SLF001 254 if cache: 255 cache._deployed_api_root = self.api_root # noqa: SLF001 256 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 257 cache._deployed_destination_id = deployed_connection.destination_id # noqa: SLF001 258 259 return CloudConnection( 260 workspace=self, 261 connection_id=deployed_connection.connection_id, 262 source=deployed_connection.source_id, 263 destination=deployed_connection.destination_id, 264 ) 265 266 def get_connection( 267 self, 268 connection_id: str, 269 ) -> CloudConnection: 270 """Get a connection by ID. 271 272 This method does not fetch data from the API. It returns a `CloudConnection` object, 273 which will be loaded lazily as needed. 274 """ 275 return CloudConnection( 276 workspace=self, 277 connection_id=connection_id, 278 ) 279 280 def _permanently_delete_connection( 281 self, 282 connection: str | CloudConnection, 283 *, 284 delete_source: bool = False, 285 delete_destination: bool = False, 286 ) -> None: 287 """Delete a deployed connection from the workspace.""" 288 if connection is None: 289 raise ValueError("No connection ID provided.") # noqa: TRY003 290 291 if isinstance(connection, str): 292 connection = CloudConnection( 293 workspace=self, 294 connection_id=connection, 295 ) 296 297 delete_connection( 298 connection_id=connection.connection_id, 299 api_root=self.api_root, 300 api_key=self.api_key, 301 workspace_id=self.workspace_id, 302 ) 303 if delete_source: 304 self._permanently_delete_source(source=connection.source_id) 305 306 if delete_destination: 307 self._permanently_delete_destination(destination=connection.destination_id) 308 309 # Run syncs 310 311 def run_sync( 312 self, 313 connection_id: str, 314 *, 315 wait: bool = True, 316 wait_timeout: int = 300, 317 ) -> SyncResult: 318 """Run a sync on a deployed connection.""" 319 connection = CloudConnection( 320 workspace=self, 321 connection_id=connection_id, 322 ) 323 return connection.run_sync(wait=wait, wait_timeout=wait_timeout) 324 325 # Get sync results and previous sync logs 326 327 def get_sync_result( 328 self, 329 connection_id: str, 330 job_id: str | None = None, 331 ) -> SyncResult | None: 332 """Get the sync result for a connection job. 333 334 If `job_id` is not provided, the most recent sync job will be used. 335 336 Returns `None` if job_id is omitted and no previous jobs are found. 337 """ 338 connection = CloudConnection( 339 workspace=self, 340 connection_id=connection_id, 341 ) 342 if job_id is None: 343 results = self.get_previous_sync_logs( 344 connection_id=connection_id, 345 limit=1, 346 ) 347 if results: 348 return results[0] 349 350 return None 351 connection = CloudConnection( 352 workspace=self, 353 connection_id=connection_id, 354 ) 355 return SyncResult( 356 workspace=self, 357 connection=connection, 358 job_id=job_id, 359 ) 360 361 def get_previous_sync_logs( 362 self, 363 connection_id: str, 364 *, 365 limit: int = 10, 366 ) -> list[SyncResult]: 367 """Get the previous sync logs for a connection.""" 368 connection = CloudConnection( 369 workspace=self, 370 connection_id=connection_id, 371 ) 372 return connection.get_previous_sync_logs( 373 limit=limit, 374 )
36@dataclass 37class CloudWorkspace: 38 """A remote workspace on the Airbyte Cloud. 39 40 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 41 instances, both OSS and Enterprise. 42 """ 43 44 workspace_id: str 45 api_key: str 46 api_root: str = CLOUD_API_ROOT 47 48 @property 49 def workspace_url(self) -> str | None: 50 return f"{self.api_root}/workspaces/{self.workspace_id}" 51 52 # Test connection and creds 53 54 def connect(self) -> None: 55 """Check that the workspace is reachable and raise an exception otherwise. 56 57 Note: It is not necessary to call this method before calling other operations. It 58 serves primarily as a simple check to ensure that the workspace is reachable 59 and credentials are correct. 60 """ 61 _ = get_workspace( 62 api_root=self.api_root, 63 api_key=self.api_key, 64 workspace_id=self.workspace_id, 65 ) 66 print(f"Successfully connected to workspace: {self.workspace_url}") 67 68 # Deploy and delete sources 69 70 # TODO: Make this a public API 71 def _deploy_source( 72 self, 73 source: Source, 74 ) -> str: 75 """Deploy a source to the workspace. 76 77 Returns the newly deployed source ID. 78 """ 79 source_configuration = source.get_config().copy() 80 source_configuration["sourceType"] = source.name.replace("source-", "") 81 82 deployed_source = create_source( 83 name=f"{source.name.replace('-', ' ').title()} (Deployed by PyAirbyte)", 84 api_root=self.api_root, 85 api_key=self.api_key, 86 workspace_id=self.workspace_id, 87 config=source_configuration, 88 ) 89 90 # Set the deployment Ids on the source object 91 source._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 92 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 93 source._deployed_source_id = deployed_source.source_id # noqa: SLF001 # Accessing nn-public API 94 95 return deployed_source.source_id 96 97 def _permanently_delete_source( 98 self, 99 source: str | Source, 100 ) -> None: 101 """Delete a source from the workspace. 102 103 You can pass either the source ID `str` or a deployed `Source` object. 104 """ 105 if not isinstance(source, (str, Source)): 106 raise ValueError(f"Invalid source type: {type(source)}") # noqa: TRY004, TRY003 107 108 if isinstance(source, Source): 109 if not source._deployed_source_id: # noqa: SLF001 110 raise ValueError("Source has not been deployed.") # noqa: TRY003 111 112 source_id = source._deployed_source_id # noqa: SLF001 113 114 elif isinstance(source, str): 115 source_id = source 116 117 delete_source( 118 source_id=source_id, 119 api_root=self.api_root, 120 api_key=self.api_key, 121 ) 122 123 # Deploy and delete destinations 124 125 # TODO: Make this a public API 126 def _deploy_cache_as_destination( 127 self, 128 cache: CacheBase, 129 ) -> str: 130 """Deploy a cache to the workspace as a new destination. 131 132 Returns the newly deployed destination ID. 133 """ 134 cache_type_name = cache.__class__.__name__.replace("Cache", "") 135 136 deployed_destination: DestinationResponse = create_destination( 137 name=f"Destination {cache_type_name} (Deployed by PyAirbyte)", 138 api_root=self.api_root, 139 api_key=self.api_key, 140 workspace_id=self.workspace_id, 141 config=get_destination_config_from_cache(cache), 142 ) 143 144 # Set the deployment Ids on the source object 145 cache._deployed_api_root = self.api_root # noqa: SLF001 # Accessing nn-public API 146 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 # Accessing nn-public API 147 cache._deployed_destination_id = deployed_destination.destination_id # noqa: SLF001 # Accessing nn-public API 148 149 return deployed_destination.destination_id 150 151 def _permanently_delete_destination( 152 self, 153 *, 154 destination: str | None = None, 155 cache: CacheBase | None = None, 156 ) -> None: 157 """Delete a deployed destination from the workspace. 158 159 You can pass either the `Cache` class or the deployed destination ID as a `str`. 160 """ 161 if destination is None and cache is None: 162 raise ValueError("You must provide either a destination ID or a cache object.") # noqa: TRY003 163 if destination is not None and cache is not None: 164 raise ValueError( # noqa: TRY003 165 "You must provide either a destination ID or a cache object, not both." 166 ) 167 168 if cache: 169 if not cache._deployed_destination_id: # noqa: SLF001 170 raise ValueError("Cache has not been deployed.") # noqa: TRY003 171 172 destination = cache._deployed_destination_id # noqa: SLF001 173 174 if destination is None: 175 raise ValueError("No destination ID provided.") # noqa: TRY003 176 177 delete_destination( 178 destination_id=destination, 179 api_root=self.api_root, 180 api_key=self.api_key, 181 ) 182 183 # Deploy and delete connections 184 185 # TODO: Make this a public API 186 def _deploy_connection( 187 self, 188 source: Source | str, 189 cache: CacheBase | None = None, 190 destination: str | None = None, 191 table_prefix: str | None = None, 192 selected_streams: list[str] | None = None, 193 ) -> CloudConnection: 194 """Deploy a source and cache to the workspace as a new connection. 195 196 Returns the newly deployed connection ID as a `str`. 197 198 Args: 199 source (Source | str): The source to deploy. You can pass either an already deployed 200 source ID `str` or a PyAirbyte `Source` object. If you pass a `Source` object, 201 it will be deployed automatically. 202 cache (CacheBase, optional): The cache to deploy as a new destination. You can provide 203 `cache` or `destination`, but not both. 204 destination (str, optional): The destination ID to use. You can provide 205 `cache` or `destination`, but not both. 206 """ 207 # Resolve source ID 208 source_id: str 209 if isinstance(source, Source): 210 selected_streams = selected_streams or source.get_selected_streams() 211 if source._deployed_source_id: # noqa: SLF001 212 source_id = source._deployed_source_id # noqa: SLF001 213 else: 214 source_id = self._deploy_source(source) 215 else: 216 source_id = source 217 if not selected_streams: 218 raise exc.PyAirbyteInputError( 219 guidance="You must provide `selected_streams` when deploying a source ID." 220 ) 221 222 # Resolve destination ID 223 destination_id: str 224 if destination: 225 destination_id = destination 226 elif cache: 227 table_prefix = table_prefix if table_prefix is not None else (cache.table_prefix or "") 228 if not cache._deployed_destination_id: # noqa: SLF001 229 destination_id = self._deploy_cache_as_destination(cache) 230 else: 231 destination_id = cache._deployed_destination_id # noqa: SLF001 232 else: 233 raise exc.PyAirbyteInputError( 234 guidance="You must provide either a destination ID or a cache object." 235 ) 236 237 assert source_id is not None 238 assert destination_id is not None 239 240 deployed_connection = create_connection( 241 name="Connection (Deployed by PyAirbyte)", 242 source_id=source_id, 243 destination_id=destination_id, 244 api_root=self.api_root, 245 api_key=self.api_key, 246 workspace_id=self.workspace_id, 247 selected_stream_names=selected_streams, 248 prefix=table_prefix or "", 249 ) 250 251 if isinstance(source, Source): 252 source._deployed_api_root = self.api_root # noqa: SLF001 253 source._deployed_workspace_id = self.workspace_id # noqa: SLF001 254 source._deployed_source_id = source_id # noqa: SLF001 255 if cache: 256 cache._deployed_api_root = self.api_root # noqa: SLF001 257 cache._deployed_workspace_id = self.workspace_id # noqa: SLF001 258 cache._deployed_destination_id = deployed_connection.destination_id # noqa: SLF001 259 260 return CloudConnection( 261 workspace=self, 262 connection_id=deployed_connection.connection_id, 263 source=deployed_connection.source_id, 264 destination=deployed_connection.destination_id, 265 ) 266 267 def get_connection( 268 self, 269 connection_id: str, 270 ) -> CloudConnection: 271 """Get a connection by ID. 272 273 This method does not fetch data from the API. It returns a `CloudConnection` object, 274 which will be loaded lazily as needed. 275 """ 276 return CloudConnection( 277 workspace=self, 278 connection_id=connection_id, 279 ) 280 281 def _permanently_delete_connection( 282 self, 283 connection: str | CloudConnection, 284 *, 285 delete_source: bool = False, 286 delete_destination: bool = False, 287 ) -> None: 288 """Delete a deployed connection from the workspace.""" 289 if connection is None: 290 raise ValueError("No connection ID provided.") # noqa: TRY003 291 292 if isinstance(connection, str): 293 connection = CloudConnection( 294 workspace=self, 295 connection_id=connection, 296 ) 297 298 delete_connection( 299 connection_id=connection.connection_id, 300 api_root=self.api_root, 301 api_key=self.api_key, 302 workspace_id=self.workspace_id, 303 ) 304 if delete_source: 305 self._permanently_delete_source(source=connection.source_id) 306 307 if delete_destination: 308 self._permanently_delete_destination(destination=connection.destination_id) 309 310 # Run syncs 311 312 def run_sync( 313 self, 314 connection_id: str, 315 *, 316 wait: bool = True, 317 wait_timeout: int = 300, 318 ) -> SyncResult: 319 """Run a sync on a deployed connection.""" 320 connection = CloudConnection( 321 workspace=self, 322 connection_id=connection_id, 323 ) 324 return connection.run_sync(wait=wait, wait_timeout=wait_timeout) 325 326 # Get sync results and previous sync logs 327 328 def get_sync_result( 329 self, 330 connection_id: str, 331 job_id: str | None = None, 332 ) -> SyncResult | None: 333 """Get the sync result for a connection job. 334 335 If `job_id` is not provided, the most recent sync job will be used. 336 337 Returns `None` if job_id is omitted and no previous jobs are found. 338 """ 339 connection = CloudConnection( 340 workspace=self, 341 connection_id=connection_id, 342 ) 343 if job_id is None: 344 results = self.get_previous_sync_logs( 345 connection_id=connection_id, 346 limit=1, 347 ) 348 if results: 349 return results[0] 350 351 return None 352 connection = CloudConnection( 353 workspace=self, 354 connection_id=connection_id, 355 ) 356 return SyncResult( 357 workspace=self, 358 connection=connection, 359 job_id=job_id, 360 ) 361 362 def get_previous_sync_logs( 363 self, 364 connection_id: str, 365 *, 366 limit: int = 10, 367 ) -> list[SyncResult]: 368 """Get the previous sync logs for a connection.""" 369 connection = CloudConnection( 370 workspace=self, 371 connection_id=connection_id, 372 ) 373 return connection.get_previous_sync_logs( 374 limit=limit, 375 )
A remote workspace on the Airbyte Cloud.
By overriding api_root
, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
54 def connect(self) -> None: 55 """Check that the workspace is reachable and raise an exception otherwise. 56 57 Note: It is not necessary to call this method before calling other operations. It 58 serves primarily as a simple check to ensure that the workspace is reachable 59 and credentials are correct. 60 """ 61 _ = get_workspace( 62 api_root=self.api_root, 63 api_key=self.api_key, 64 workspace_id=self.workspace_id, 65 ) 66 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
267 def get_connection( 268 self, 269 connection_id: str, 270 ) -> CloudConnection: 271 """Get a connection by ID. 272 273 This method does not fetch data from the API. It returns a `CloudConnection` object, 274 which will be loaded lazily as needed. 275 """ 276 return CloudConnection( 277 workspace=self, 278 connection_id=connection_id, 279 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection
object,
which will be loaded lazily as needed.
312 def run_sync( 313 self, 314 connection_id: str, 315 *, 316 wait: bool = True, 317 wait_timeout: int = 300, 318 ) -> SyncResult: 319 """Run a sync on a deployed connection.""" 320 connection = CloudConnection( 321 workspace=self, 322 connection_id=connection_id, 323 ) 324 return connection.run_sync(wait=wait, wait_timeout=wait_timeout)
Run a sync on a deployed connection.
328 def get_sync_result( 329 self, 330 connection_id: str, 331 job_id: str | None = None, 332 ) -> SyncResult | None: 333 """Get the sync result for a connection job. 334 335 If `job_id` is not provided, the most recent sync job will be used. 336 337 Returns `None` if job_id is omitted and no previous jobs are found. 338 """ 339 connection = CloudConnection( 340 workspace=self, 341 connection_id=connection_id, 342 ) 343 if job_id is None: 344 results = self.get_previous_sync_logs( 345 connection_id=connection_id, 346 limit=1, 347 ) 348 if results: 349 return results[0] 350 351 return None 352 connection = CloudConnection( 353 workspace=self, 354 connection_id=connection_id, 355 ) 356 return SyncResult( 357 workspace=self, 358 connection=connection, 359 job_id=job_id, 360 )
Get the sync result for a connection job.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
362 def get_previous_sync_logs( 363 self, 364 connection_id: str, 365 *, 366 limit: int = 10, 367 ) -> list[SyncResult]: 368 """Get the previous sync logs for a connection.""" 369 connection = CloudConnection( 370 workspace=self, 371 connection_id=connection_id, 372 ) 373 return connection.get_previous_sync_logs( 374 limit=limit, 375 )
Get the previous sync logs for a connection.