airbyte.cloud.connections
Cloud Connections.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Cloud Connections.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING, Any 7 8from airbyte._util import api_util 9from airbyte.cloud.connectors import CloudDestination, CloudSource 10from airbyte.cloud.sync_results import SyncResult 11from airbyte.exceptions import AirbyteWorkspaceMismatchError 12 13 14if TYPE_CHECKING: 15 from airbyte_api.models import ConnectionResponse, JobResponse 16 17 from airbyte.cloud.workspaces import CloudWorkspace 18 19 20class CloudConnection: # noqa: PLR0904 # Too many public methods 21 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 22 23 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 24 """ 25 26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)""" 57 58 def _fetch_connection_info( 59 self, 60 *, 61 force_refresh: bool = False, 62 verify: bool = True, 63 ) -> ConnectionResponse: 64 """Fetch and cache connection info from the API. 65 66 By default, this method will only fetch from the API if connection info is not 67 already cached. It also verifies that the connection belongs to the expected 68 workspace unless verification is explicitly disabled. 69 70 Args: 71 force_refresh: If True, always fetch from the API even if cached. 72 If False (default), only fetch if not already cached. 73 verify: If True (default), verify that the connection is valid (e.g., that 74 the workspace_id matches this object's workspace). Raises an error if 75 validation fails. 76 77 Returns: 78 The ConnectionResponse from the API. 79 80 Raises: 81 AirbyteWorkspaceMismatchError: If verify is True and the connection's 82 workspace_id doesn't match the expected workspace. 83 AirbyteMissingResourceError: If the connection doesn't exist. 84 """ 85 if not force_refresh and self._connection_info is not None: 86 # Use cached info, but still verify if requested 87 if verify: 88 self._verify_workspace_match(self._connection_info) 89 return self._connection_info 90 91 # Fetch from API 92 connection_info = api_util.get_connection( 93 workspace_id=self.workspace.workspace_id, 94 connection_id=self.connection_id, 95 api_root=self.workspace.api_root, 96 client_id=self.workspace.client_id, 97 client_secret=self.workspace.client_secret, 98 bearer_token=self.workspace.bearer_token, 99 ) 100 101 # Cache the result first (before verification may raise) 102 self._connection_info = connection_info 103 104 # Verify if requested 105 if verify: 106 self._verify_workspace_match(connection_info) 107 108 return connection_info 109 110 def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None: 111 """Verify that the connection belongs to the expected workspace. 112 113 Raises: 114 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 115 """ 116 if connection_info.workspace_id != self.workspace.workspace_id: 117 raise AirbyteWorkspaceMismatchError( 118 resource_type="connection", 119 resource_id=self.connection_id, 120 workspace=self.workspace, 121 expected_workspace_id=self.workspace.workspace_id, 122 actual_workspace_id=connection_info.workspace_id, 123 message=( 124 f"Connection '{self.connection_id}' belongs to workspace " 125 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 126 ), 127 ) 128 129 def check_is_valid(self) -> bool: 130 """Check if this connection exists and belongs to the expected workspace. 131 132 This method fetches connection info from the API (if not already cached) and 133 verifies that the connection's workspace_id matches the workspace associated 134 with this CloudConnection object. 135 136 Returns: 137 True if the connection exists and belongs to the expected workspace. 138 139 Raises: 140 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 141 AirbyteMissingResourceError: If the connection doesn't exist. 142 """ 143 self._fetch_connection_info(force_refresh=False, verify=True) 144 return True 145 146 @classmethod 147 def _from_connection_response( 148 cls, 149 workspace: CloudWorkspace, 150 connection_response: ConnectionResponse, 151 ) -> CloudConnection: 152 """Create a CloudConnection from a ConnectionResponse.""" 153 result = cls( 154 workspace=workspace, 155 connection_id=connection_response.connection_id, 156 source=connection_response.source_id, 157 destination=connection_response.destination_id, 158 ) 159 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 160 return result 161 162 # Properties 163 164 @property 165 def name(self) -> str | None: 166 """Get the display name of the connection, if available. 167 168 E.g. "My Postgres to Snowflake", not the connection ID. 169 """ 170 if not self._connection_info: 171 self._connection_info = self._fetch_connection_info() 172 173 return self._connection_info.name 174 175 @property 176 def source_id(self) -> str: 177 """The ID of the source.""" 178 if not self._source_id: 179 if not self._connection_info: 180 self._connection_info = self._fetch_connection_info() 181 182 self._source_id = self._connection_info.source_id 183 184 return self._source_id 185 186 @property 187 def source(self) -> CloudSource: 188 """Get the source object.""" 189 if self._cloud_source_object: 190 return self._cloud_source_object 191 192 self._cloud_source_object = CloudSource( 193 workspace=self.workspace, 194 connector_id=self.source_id, 195 ) 196 return self._cloud_source_object 197 198 @property 199 def destination_id(self) -> str: 200 """The ID of the destination.""" 201 if not self._destination_id: 202 if not self._connection_info: 203 self._connection_info = self._fetch_connection_info() 204 205 self._destination_id = self._connection_info.destination_id 206 207 return self._destination_id 208 209 @property 210 def destination(self) -> CloudDestination: 211 """Get the destination object.""" 212 if self._cloud_destination_object: 213 return self._cloud_destination_object 214 215 self._cloud_destination_object = CloudDestination( 216 workspace=self.workspace, 217 connector_id=self.destination_id, 218 ) 219 return self._cloud_destination_object 220 221 @property 222 def stream_names(self) -> list[str]: 223 """The stream names.""" 224 if not self._connection_info: 225 self._connection_info = self._fetch_connection_info() 226 227 return [stream.name for stream in self._connection_info.configurations.streams or []] 228 229 @property 230 def table_prefix(self) -> str: 231 """The table prefix.""" 232 if not self._connection_info: 233 self._connection_info = self._fetch_connection_info() 234 235 return self._connection_info.prefix or "" 236 237 @property 238 def connection_url(self) -> str | None: 239 """The web URL to the connection.""" 240 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 241 242 @property 243 def job_history_url(self) -> str | None: 244 """The URL to the job history for the connection.""" 245 return f"{self.connection_url}/timeline" 246 247 # Run Sync 248 249 def run_sync( 250 self, 251 *, 252 wait: bool = True, 253 wait_timeout: int = 300, 254 ) -> SyncResult: 255 """Run a sync.""" 256 connection_response = api_util.run_connection( 257 connection_id=self.connection_id, 258 api_root=self.workspace.api_root, 259 workspace_id=self.workspace.workspace_id, 260 client_id=self.workspace.client_id, 261 client_secret=self.workspace.client_secret, 262 bearer_token=self.workspace.bearer_token, 263 ) 264 sync_result = SyncResult( 265 workspace=self.workspace, 266 connection=self, 267 job_id=connection_response.job_id, 268 ) 269 270 if wait: 271 sync_result.wait_for_completion( 272 wait_timeout=wait_timeout, 273 raise_failure=True, 274 raise_timeout=True, 275 ) 276 277 return sync_result 278 279 def __repr__(self) -> str: 280 """String representation of the connection.""" 281 return ( 282 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 283 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 284 ) 285 286 # Logs 287 288 def get_previous_sync_logs( 289 self, 290 *, 291 limit: int = 20, 292 offset: int | None = None, 293 from_tail: bool = True, 294 ) -> list[SyncResult]: 295 """Get previous sync jobs for a connection with pagination support. 296 297 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 298 rows_synced, start_time). Full log text can be fetched lazily via 299 `SyncResult.get_full_log_text()`. 300 301 Args: 302 limit: Maximum number of jobs to return. Defaults to 20. 303 offset: Number of jobs to skip from the beginning. Defaults to None (0). 304 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 305 If False, returns jobs ordered oldest-first (createdAt ASC). 306 Defaults to True. 307 308 Returns: 309 A list of SyncResult objects representing the sync jobs. 310 """ 311 order_by = ( 312 api_util.JOB_ORDER_BY_CREATED_AT_DESC 313 if from_tail 314 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 315 ) 316 sync_logs: list[JobResponse] = api_util.get_job_logs( 317 connection_id=self.connection_id, 318 api_root=self.workspace.api_root, 319 workspace_id=self.workspace.workspace_id, 320 limit=limit, 321 offset=offset, 322 order_by=order_by, 323 client_id=self.workspace.client_id, 324 client_secret=self.workspace.client_secret, 325 bearer_token=self.workspace.bearer_token, 326 ) 327 return [ 328 SyncResult( 329 workspace=self.workspace, 330 connection=self, 331 job_id=sync_log.job_id, 332 _latest_job_info=sync_log, 333 ) 334 for sync_log in sync_logs 335 ] 336 337 def get_sync_result( 338 self, 339 job_id: int | None = None, 340 ) -> SyncResult | None: 341 """Get the sync result for the connection. 342 343 If `job_id` is not provided, the most recent sync job will be used. 344 345 Returns `None` if job_id is omitted and no previous jobs are found. 346 """ 347 if job_id is None: 348 # Get the most recent sync job 349 results = self.get_previous_sync_logs( 350 limit=1, 351 ) 352 if results: 353 return results[0] 354 355 return None 356 357 # Get the sync job by ID (lazy loaded) 358 return SyncResult( 359 workspace=self.workspace, 360 connection=self, 361 job_id=job_id, 362 ) 363 364 # Artifacts 365 366 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 367 """Get the connection state artifacts. 368 369 Returns the persisted state for this connection, which can be used 370 when debugging incremental syncs. 371 372 Uses the Config API endpoint: POST /v1/state/get 373 374 Returns: 375 List of state objects for each stream, or None if no state is set. 376 """ 377 state_response = api_util.get_connection_state( 378 connection_id=self.connection_id, 379 api_root=self.workspace.api_root, 380 client_id=self.workspace.client_id, 381 client_secret=self.workspace.client_secret, 382 bearer_token=self.workspace.bearer_token, 383 ) 384 if state_response.get("stateType") == "not_set": 385 return None 386 return state_response.get("streamState", []) 387 388 def get_catalog_artifact(self) -> dict[str, Any] | None: 389 """Get the configured catalog for this connection. 390 391 Returns the full configured catalog (syncCatalog) for this connection, 392 including stream schemas, sync modes, cursor fields, and primary keys. 393 394 Uses the Config API endpoint: POST /v1/web_backend/connections/get 395 396 Returns: 397 Dictionary containing the configured catalog, or `None` if not found. 398 """ 399 connection_response = api_util.get_connection_catalog( 400 connection_id=self.connection_id, 401 api_root=self.workspace.api_root, 402 client_id=self.workspace.client_id, 403 client_secret=self.workspace.client_secret, 404 bearer_token=self.workspace.bearer_token, 405 ) 406 return connection_response.get("syncCatalog") 407 408 def rename(self, name: str) -> CloudConnection: 409 """Rename the connection. 410 411 Args: 412 name: New name for the connection 413 414 Returns: 415 Updated CloudConnection object with refreshed info 416 """ 417 updated_response = api_util.patch_connection( 418 connection_id=self.connection_id, 419 api_root=self.workspace.api_root, 420 client_id=self.workspace.client_id, 421 client_secret=self.workspace.client_secret, 422 bearer_token=self.workspace.bearer_token, 423 name=name, 424 ) 425 self._connection_info = updated_response 426 return self 427 428 def set_table_prefix(self, prefix: str) -> CloudConnection: 429 """Set the table prefix for the connection. 430 431 Args: 432 prefix: New table prefix to use when syncing to the destination 433 434 Returns: 435 Updated CloudConnection object with refreshed info 436 """ 437 updated_response = api_util.patch_connection( 438 connection_id=self.connection_id, 439 api_root=self.workspace.api_root, 440 client_id=self.workspace.client_id, 441 client_secret=self.workspace.client_secret, 442 bearer_token=self.workspace.bearer_token, 443 prefix=prefix, 444 ) 445 self._connection_info = updated_response 446 return self 447 448 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 449 """Set the selected streams for the connection. 450 451 This is a destructive operation that can break existing connections if the 452 stream selection is changed incorrectly. Use with caution. 453 454 Args: 455 stream_names: List of stream names to sync 456 457 Returns: 458 Updated CloudConnection object with refreshed info 459 """ 460 configurations = api_util.build_stream_configurations(stream_names) 461 462 updated_response = api_util.patch_connection( 463 connection_id=self.connection_id, 464 api_root=self.workspace.api_root, 465 client_id=self.workspace.client_id, 466 client_secret=self.workspace.client_secret, 467 bearer_token=self.workspace.bearer_token, 468 configurations=configurations, 469 ) 470 self._connection_info = updated_response 471 return self 472 473 # Enable/Disable 474 475 @property 476 def enabled(self) -> bool: 477 """Get the current enabled status of the connection. 478 479 This property always fetches fresh data from the API to ensure accuracy, 480 as another process or user may have toggled the setting. 481 482 Returns: 483 True if the connection status is 'active', False otherwise. 484 """ 485 connection_info = self._fetch_connection_info(force_refresh=True) 486 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE 487 488 @enabled.setter 489 def enabled(self, value: bool) -> None: 490 """Set the enabled status of the connection. 491 492 Args: 493 value: True to enable (set status to 'active'), False to disable 494 (set status to 'inactive'). 495 """ 496 self.set_enabled(enabled=value) 497 498 def set_enabled( 499 self, 500 *, 501 enabled: bool, 502 ignore_noop: bool = True, 503 ) -> None: 504 """Set the enabled status of the connection. 505 506 Args: 507 enabled: True to enable (set status to 'active'), False to disable 508 (set status to 'inactive'). 509 ignore_noop: If True (default), silently return if the connection is already 510 in the requested state. If False, raise ValueError when the requested 511 state matches the current state. 512 513 Raises: 514 ValueError: If ignore_noop is False and the connection is already in the 515 requested state. 516 """ 517 # Always fetch fresh data to check current status 518 connection_info = self._fetch_connection_info(force_refresh=True) 519 current_status = connection_info.status 520 desired_status = ( 521 api_util.models.ConnectionStatusEnum.ACTIVE 522 if enabled 523 else api_util.models.ConnectionStatusEnum.INACTIVE 524 ) 525 526 if current_status == desired_status: 527 if ignore_noop: 528 return 529 raise ValueError( 530 f"Connection is already {'enabled' if enabled else 'disabled'}. " 531 f"Current status: {current_status}" 532 ) 533 534 updated_response = api_util.patch_connection( 535 connection_id=self.connection_id, 536 api_root=self.workspace.api_root, 537 client_id=self.workspace.client_id, 538 client_secret=self.workspace.client_secret, 539 bearer_token=self.workspace.bearer_token, 540 status=desired_status, 541 ) 542 self._connection_info = updated_response 543 544 # Scheduling 545 546 def set_schedule( 547 self, 548 cron_expression: str, 549 ) -> None: 550 """Set a cron schedule for the connection. 551 552 Args: 553 cron_expression: A cron expression defining when syncs should run. 554 555 Examples: 556 - "0 0 * * *" - Daily at midnight UTC 557 - "0 */6 * * *" - Every 6 hours 558 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 559 """ 560 schedule = api_util.models.AirbyteAPIConnectionSchedule( 561 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 562 cron_expression=cron_expression, 563 ) 564 updated_response = api_util.patch_connection( 565 connection_id=self.connection_id, 566 api_root=self.workspace.api_root, 567 client_id=self.workspace.client_id, 568 client_secret=self.workspace.client_secret, 569 bearer_token=self.workspace.bearer_token, 570 schedule=schedule, 571 ) 572 self._connection_info = updated_response 573 574 def set_manual_schedule(self) -> None: 575 """Set the connection to manual scheduling. 576 577 Disables automatic syncs. Syncs will only run when manually triggered. 578 """ 579 schedule = api_util.models.AirbyteAPIConnectionSchedule( 580 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 581 ) 582 updated_response = api_util.patch_connection( 583 connection_id=self.connection_id, 584 api_root=self.workspace.api_root, 585 client_id=self.workspace.client_id, 586 client_secret=self.workspace.client_secret, 587 bearer_token=self.workspace.bearer_token, 588 schedule=schedule, 589 ) 590 self._connection_info = updated_response 591 592 # Deletions 593 594 def permanently_delete( 595 self, 596 *, 597 cascade_delete_source: bool = False, 598 cascade_delete_destination: bool = False, 599 ) -> None: 600 """Delete the connection. 601 602 Args: 603 cascade_delete_source: Whether to also delete the source. 604 cascade_delete_destination: Whether to also delete the destination. 605 """ 606 self.workspace.permanently_delete_connection(self) 607 608 if cascade_delete_source: 609 self.workspace.permanently_delete_source(self.source_id) 610 611 if cascade_delete_destination: 612 self.workspace.permanently_delete_destination(self.destination_id)
21class CloudConnection: # noqa: PLR0904 # Too many public methods 22 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 23 24 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 25 """ 26 27 def __init__( 28 self, 29 workspace: CloudWorkspace, 30 connection_id: str, 31 source: str | None = None, 32 destination: str | None = None, 33 ) -> None: 34 """It is not recommended to create a `CloudConnection` object directly. 35 36 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 37 """ 38 self.connection_id = connection_id 39 """The ID of the connection.""" 40 41 self.workspace = workspace 42 """The workspace that the connection belongs to.""" 43 44 self._source_id = source 45 """The ID of the source.""" 46 47 self._destination_id = destination 48 """The ID of the destination.""" 49 50 self._connection_info: ConnectionResponse | None = None 51 """The connection info object. (Cached.)""" 52 53 self._cloud_source_object: CloudSource | None = None 54 """The source object. (Cached.)""" 55 56 self._cloud_destination_object: CloudDestination | None = None 57 """The destination object. (Cached.)""" 58 59 def _fetch_connection_info( 60 self, 61 *, 62 force_refresh: bool = False, 63 verify: bool = True, 64 ) -> ConnectionResponse: 65 """Fetch and cache connection info from the API. 66 67 By default, this method will only fetch from the API if connection info is not 68 already cached. It also verifies that the connection belongs to the expected 69 workspace unless verification is explicitly disabled. 70 71 Args: 72 force_refresh: If True, always fetch from the API even if cached. 73 If False (default), only fetch if not already cached. 74 verify: If True (default), verify that the connection is valid (e.g., that 75 the workspace_id matches this object's workspace). Raises an error if 76 validation fails. 77 78 Returns: 79 The ConnectionResponse from the API. 80 81 Raises: 82 AirbyteWorkspaceMismatchError: If verify is True and the connection's 83 workspace_id doesn't match the expected workspace. 84 AirbyteMissingResourceError: If the connection doesn't exist. 85 """ 86 if not force_refresh and self._connection_info is not None: 87 # Use cached info, but still verify if requested 88 if verify: 89 self._verify_workspace_match(self._connection_info) 90 return self._connection_info 91 92 # Fetch from API 93 connection_info = api_util.get_connection( 94 workspace_id=self.workspace.workspace_id, 95 connection_id=self.connection_id, 96 api_root=self.workspace.api_root, 97 client_id=self.workspace.client_id, 98 client_secret=self.workspace.client_secret, 99 bearer_token=self.workspace.bearer_token, 100 ) 101 102 # Cache the result first (before verification may raise) 103 self._connection_info = connection_info 104 105 # Verify if requested 106 if verify: 107 self._verify_workspace_match(connection_info) 108 109 return connection_info 110 111 def _verify_workspace_match(self, connection_info: ConnectionResponse) -> None: 112 """Verify that the connection belongs to the expected workspace. 113 114 Raises: 115 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 116 """ 117 if connection_info.workspace_id != self.workspace.workspace_id: 118 raise AirbyteWorkspaceMismatchError( 119 resource_type="connection", 120 resource_id=self.connection_id, 121 workspace=self.workspace, 122 expected_workspace_id=self.workspace.workspace_id, 123 actual_workspace_id=connection_info.workspace_id, 124 message=( 125 f"Connection '{self.connection_id}' belongs to workspace " 126 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 127 ), 128 ) 129 130 def check_is_valid(self) -> bool: 131 """Check if this connection exists and belongs to the expected workspace. 132 133 This method fetches connection info from the API (if not already cached) and 134 verifies that the connection's workspace_id matches the workspace associated 135 with this CloudConnection object. 136 137 Returns: 138 True if the connection exists and belongs to the expected workspace. 139 140 Raises: 141 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 142 AirbyteMissingResourceError: If the connection doesn't exist. 143 """ 144 self._fetch_connection_info(force_refresh=False, verify=True) 145 return True 146 147 @classmethod 148 def _from_connection_response( 149 cls, 150 workspace: CloudWorkspace, 151 connection_response: ConnectionResponse, 152 ) -> CloudConnection: 153 """Create a CloudConnection from a ConnectionResponse.""" 154 result = cls( 155 workspace=workspace, 156 connection_id=connection_response.connection_id, 157 source=connection_response.source_id, 158 destination=connection_response.destination_id, 159 ) 160 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 161 return result 162 163 # Properties 164 165 @property 166 def name(self) -> str | None: 167 """Get the display name of the connection, if available. 168 169 E.g. "My Postgres to Snowflake", not the connection ID. 170 """ 171 if not self._connection_info: 172 self._connection_info = self._fetch_connection_info() 173 174 return self._connection_info.name 175 176 @property 177 def source_id(self) -> str: 178 """The ID of the source.""" 179 if not self._source_id: 180 if not self._connection_info: 181 self._connection_info = self._fetch_connection_info() 182 183 self._source_id = self._connection_info.source_id 184 185 return self._source_id 186 187 @property 188 def source(self) -> CloudSource: 189 """Get the source object.""" 190 if self._cloud_source_object: 191 return self._cloud_source_object 192 193 self._cloud_source_object = CloudSource( 194 workspace=self.workspace, 195 connector_id=self.source_id, 196 ) 197 return self._cloud_source_object 198 199 @property 200 def destination_id(self) -> str: 201 """The ID of the destination.""" 202 if not self._destination_id: 203 if not self._connection_info: 204 self._connection_info = self._fetch_connection_info() 205 206 self._destination_id = self._connection_info.destination_id 207 208 return self._destination_id 209 210 @property 211 def destination(self) -> CloudDestination: 212 """Get the destination object.""" 213 if self._cloud_destination_object: 214 return self._cloud_destination_object 215 216 self._cloud_destination_object = CloudDestination( 217 workspace=self.workspace, 218 connector_id=self.destination_id, 219 ) 220 return self._cloud_destination_object 221 222 @property 223 def stream_names(self) -> list[str]: 224 """The stream names.""" 225 if not self._connection_info: 226 self._connection_info = self._fetch_connection_info() 227 228 return [stream.name for stream in self._connection_info.configurations.streams or []] 229 230 @property 231 def table_prefix(self) -> str: 232 """The table prefix.""" 233 if not self._connection_info: 234 self._connection_info = self._fetch_connection_info() 235 236 return self._connection_info.prefix or "" 237 238 @property 239 def connection_url(self) -> str | None: 240 """The web URL to the connection.""" 241 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 242 243 @property 244 def job_history_url(self) -> str | None: 245 """The URL to the job history for the connection.""" 246 return f"{self.connection_url}/timeline" 247 248 # Run Sync 249 250 def run_sync( 251 self, 252 *, 253 wait: bool = True, 254 wait_timeout: int = 300, 255 ) -> SyncResult: 256 """Run a sync.""" 257 connection_response = api_util.run_connection( 258 connection_id=self.connection_id, 259 api_root=self.workspace.api_root, 260 workspace_id=self.workspace.workspace_id, 261 client_id=self.workspace.client_id, 262 client_secret=self.workspace.client_secret, 263 bearer_token=self.workspace.bearer_token, 264 ) 265 sync_result = SyncResult( 266 workspace=self.workspace, 267 connection=self, 268 job_id=connection_response.job_id, 269 ) 270 271 if wait: 272 sync_result.wait_for_completion( 273 wait_timeout=wait_timeout, 274 raise_failure=True, 275 raise_timeout=True, 276 ) 277 278 return sync_result 279 280 def __repr__(self) -> str: 281 """String representation of the connection.""" 282 return ( 283 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 284 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 285 ) 286 287 # Logs 288 289 def get_previous_sync_logs( 290 self, 291 *, 292 limit: int = 20, 293 offset: int | None = None, 294 from_tail: bool = True, 295 ) -> list[SyncResult]: 296 """Get previous sync jobs for a connection with pagination support. 297 298 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 299 rows_synced, start_time). Full log text can be fetched lazily via 300 `SyncResult.get_full_log_text()`. 301 302 Args: 303 limit: Maximum number of jobs to return. Defaults to 20. 304 offset: Number of jobs to skip from the beginning. Defaults to None (0). 305 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 306 If False, returns jobs ordered oldest-first (createdAt ASC). 307 Defaults to True. 308 309 Returns: 310 A list of SyncResult objects representing the sync jobs. 311 """ 312 order_by = ( 313 api_util.JOB_ORDER_BY_CREATED_AT_DESC 314 if from_tail 315 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 316 ) 317 sync_logs: list[JobResponse] = api_util.get_job_logs( 318 connection_id=self.connection_id, 319 api_root=self.workspace.api_root, 320 workspace_id=self.workspace.workspace_id, 321 limit=limit, 322 offset=offset, 323 order_by=order_by, 324 client_id=self.workspace.client_id, 325 client_secret=self.workspace.client_secret, 326 bearer_token=self.workspace.bearer_token, 327 ) 328 return [ 329 SyncResult( 330 workspace=self.workspace, 331 connection=self, 332 job_id=sync_log.job_id, 333 _latest_job_info=sync_log, 334 ) 335 for sync_log in sync_logs 336 ] 337 338 def get_sync_result( 339 self, 340 job_id: int | None = None, 341 ) -> SyncResult | None: 342 """Get the sync result for the connection. 343 344 If `job_id` is not provided, the most recent sync job will be used. 345 346 Returns `None` if job_id is omitted and no previous jobs are found. 347 """ 348 if job_id is None: 349 # Get the most recent sync job 350 results = self.get_previous_sync_logs( 351 limit=1, 352 ) 353 if results: 354 return results[0] 355 356 return None 357 358 # Get the sync job by ID (lazy loaded) 359 return SyncResult( 360 workspace=self.workspace, 361 connection=self, 362 job_id=job_id, 363 ) 364 365 # Artifacts 366 367 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 368 """Get the connection state artifacts. 369 370 Returns the persisted state for this connection, which can be used 371 when debugging incremental syncs. 372 373 Uses the Config API endpoint: POST /v1/state/get 374 375 Returns: 376 List of state objects for each stream, or None if no state is set. 377 """ 378 state_response = api_util.get_connection_state( 379 connection_id=self.connection_id, 380 api_root=self.workspace.api_root, 381 client_id=self.workspace.client_id, 382 client_secret=self.workspace.client_secret, 383 bearer_token=self.workspace.bearer_token, 384 ) 385 if state_response.get("stateType") == "not_set": 386 return None 387 return state_response.get("streamState", []) 388 389 def get_catalog_artifact(self) -> dict[str, Any] | None: 390 """Get the configured catalog for this connection. 391 392 Returns the full configured catalog (syncCatalog) for this connection, 393 including stream schemas, sync modes, cursor fields, and primary keys. 394 395 Uses the Config API endpoint: POST /v1/web_backend/connections/get 396 397 Returns: 398 Dictionary containing the configured catalog, or `None` if not found. 399 """ 400 connection_response = api_util.get_connection_catalog( 401 connection_id=self.connection_id, 402 api_root=self.workspace.api_root, 403 client_id=self.workspace.client_id, 404 client_secret=self.workspace.client_secret, 405 bearer_token=self.workspace.bearer_token, 406 ) 407 return connection_response.get("syncCatalog") 408 409 def rename(self, name: str) -> CloudConnection: 410 """Rename the connection. 411 412 Args: 413 name: New name for the connection 414 415 Returns: 416 Updated CloudConnection object with refreshed info 417 """ 418 updated_response = api_util.patch_connection( 419 connection_id=self.connection_id, 420 api_root=self.workspace.api_root, 421 client_id=self.workspace.client_id, 422 client_secret=self.workspace.client_secret, 423 bearer_token=self.workspace.bearer_token, 424 name=name, 425 ) 426 self._connection_info = updated_response 427 return self 428 429 def set_table_prefix(self, prefix: str) -> CloudConnection: 430 """Set the table prefix for the connection. 431 432 Args: 433 prefix: New table prefix to use when syncing to the destination 434 435 Returns: 436 Updated CloudConnection object with refreshed info 437 """ 438 updated_response = api_util.patch_connection( 439 connection_id=self.connection_id, 440 api_root=self.workspace.api_root, 441 client_id=self.workspace.client_id, 442 client_secret=self.workspace.client_secret, 443 bearer_token=self.workspace.bearer_token, 444 prefix=prefix, 445 ) 446 self._connection_info = updated_response 447 return self 448 449 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 450 """Set the selected streams for the connection. 451 452 This is a destructive operation that can break existing connections if the 453 stream selection is changed incorrectly. Use with caution. 454 455 Args: 456 stream_names: List of stream names to sync 457 458 Returns: 459 Updated CloudConnection object with refreshed info 460 """ 461 configurations = api_util.build_stream_configurations(stream_names) 462 463 updated_response = api_util.patch_connection( 464 connection_id=self.connection_id, 465 api_root=self.workspace.api_root, 466 client_id=self.workspace.client_id, 467 client_secret=self.workspace.client_secret, 468 bearer_token=self.workspace.bearer_token, 469 configurations=configurations, 470 ) 471 self._connection_info = updated_response 472 return self 473 474 # Enable/Disable 475 476 @property 477 def enabled(self) -> bool: 478 """Get the current enabled status of the connection. 479 480 This property always fetches fresh data from the API to ensure accuracy, 481 as another process or user may have toggled the setting. 482 483 Returns: 484 True if the connection status is 'active', False otherwise. 485 """ 486 connection_info = self._fetch_connection_info(force_refresh=True) 487 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE 488 489 @enabled.setter 490 def enabled(self, value: bool) -> None: 491 """Set the enabled status of the connection. 492 493 Args: 494 value: True to enable (set status to 'active'), False to disable 495 (set status to 'inactive'). 496 """ 497 self.set_enabled(enabled=value) 498 499 def set_enabled( 500 self, 501 *, 502 enabled: bool, 503 ignore_noop: bool = True, 504 ) -> None: 505 """Set the enabled status of the connection. 506 507 Args: 508 enabled: True to enable (set status to 'active'), False to disable 509 (set status to 'inactive'). 510 ignore_noop: If True (default), silently return if the connection is already 511 in the requested state. If False, raise ValueError when the requested 512 state matches the current state. 513 514 Raises: 515 ValueError: If ignore_noop is False and the connection is already in the 516 requested state. 517 """ 518 # Always fetch fresh data to check current status 519 connection_info = self._fetch_connection_info(force_refresh=True) 520 current_status = connection_info.status 521 desired_status = ( 522 api_util.models.ConnectionStatusEnum.ACTIVE 523 if enabled 524 else api_util.models.ConnectionStatusEnum.INACTIVE 525 ) 526 527 if current_status == desired_status: 528 if ignore_noop: 529 return 530 raise ValueError( 531 f"Connection is already {'enabled' if enabled else 'disabled'}. " 532 f"Current status: {current_status}" 533 ) 534 535 updated_response = api_util.patch_connection( 536 connection_id=self.connection_id, 537 api_root=self.workspace.api_root, 538 client_id=self.workspace.client_id, 539 client_secret=self.workspace.client_secret, 540 bearer_token=self.workspace.bearer_token, 541 status=desired_status, 542 ) 543 self._connection_info = updated_response 544 545 # Scheduling 546 547 def set_schedule( 548 self, 549 cron_expression: str, 550 ) -> None: 551 """Set a cron schedule for the connection. 552 553 Args: 554 cron_expression: A cron expression defining when syncs should run. 555 556 Examples: 557 - "0 0 * * *" - Daily at midnight UTC 558 - "0 */6 * * *" - Every 6 hours 559 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 560 """ 561 schedule = api_util.models.AirbyteAPIConnectionSchedule( 562 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 563 cron_expression=cron_expression, 564 ) 565 updated_response = api_util.patch_connection( 566 connection_id=self.connection_id, 567 api_root=self.workspace.api_root, 568 client_id=self.workspace.client_id, 569 client_secret=self.workspace.client_secret, 570 bearer_token=self.workspace.bearer_token, 571 schedule=schedule, 572 ) 573 self._connection_info = updated_response 574 575 def set_manual_schedule(self) -> None: 576 """Set the connection to manual scheduling. 577 578 Disables automatic syncs. Syncs will only run when manually triggered. 579 """ 580 schedule = api_util.models.AirbyteAPIConnectionSchedule( 581 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 582 ) 583 updated_response = api_util.patch_connection( 584 connection_id=self.connection_id, 585 api_root=self.workspace.api_root, 586 client_id=self.workspace.client_id, 587 client_secret=self.workspace.client_secret, 588 bearer_token=self.workspace.bearer_token, 589 schedule=schedule, 590 ) 591 self._connection_info = updated_response 592 593 # Deletions 594 595 def permanently_delete( 596 self, 597 *, 598 cascade_delete_source: bool = False, 599 cascade_delete_destination: bool = False, 600 ) -> None: 601 """Delete the connection. 602 603 Args: 604 cascade_delete_source: Whether to also delete the source. 605 cascade_delete_destination: Whether to also delete the destination. 606 """ 607 self.workspace.permanently_delete_connection(self) 608 609 if cascade_delete_source: 610 self.workspace.permanently_delete_source(self.source_id) 611 612 if cascade_delete_destination: 613 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
27 def __init__( 28 self, 29 workspace: CloudWorkspace, 30 connection_id: str, 31 source: str | None = None, 32 destination: str | None = None, 33 ) -> None: 34 """It is not recommended to create a `CloudConnection` object directly. 35 36 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 37 """ 38 self.connection_id = connection_id 39 """The ID of the connection.""" 40 41 self.workspace = workspace 42 """The workspace that the connection belongs to.""" 43 44 self._source_id = source 45 """The ID of the source.""" 46 47 self._destination_id = destination 48 """The ID of the destination.""" 49 50 self._connection_info: ConnectionResponse | None = None 51 """The connection info object. (Cached.)""" 52 53 self._cloud_source_object: CloudSource | None = None 54 """The source object. (Cached.)""" 55 56 self._cloud_destination_object: CloudDestination | None = None 57 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection object directly.
Instead, use CloudWorkspace.get_connection() to create a connection object.
130 def check_is_valid(self) -> bool: 131 """Check if this connection exists and belongs to the expected workspace. 132 133 This method fetches connection info from the API (if not already cached) and 134 verifies that the connection's workspace_id matches the workspace associated 135 with this CloudConnection object. 136 137 Returns: 138 True if the connection exists and belongs to the expected workspace. 139 140 Raises: 141 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 142 AirbyteMissingResourceError: If the connection doesn't exist. 143 """ 144 self._fetch_connection_info(force_refresh=False, verify=True) 145 return True
Check if this connection exists and belongs to the expected workspace.
This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.
Returns:
True if the connection exists and belongs to the expected workspace.
Raises:
- AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
- AirbyteMissingResourceError: If the connection doesn't exist.
165 @property 166 def name(self) -> str | None: 167 """Get the display name of the connection, if available. 168 169 E.g. "My Postgres to Snowflake", not the connection ID. 170 """ 171 if not self._connection_info: 172 self._connection_info = self._fetch_connection_info() 173 174 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
176 @property 177 def source_id(self) -> str: 178 """The ID of the source.""" 179 if not self._source_id: 180 if not self._connection_info: 181 self._connection_info = self._fetch_connection_info() 182 183 self._source_id = self._connection_info.source_id 184 185 return self._source_id
The ID of the source.
187 @property 188 def source(self) -> CloudSource: 189 """Get the source object.""" 190 if self._cloud_source_object: 191 return self._cloud_source_object 192 193 self._cloud_source_object = CloudSource( 194 workspace=self.workspace, 195 connector_id=self.source_id, 196 ) 197 return self._cloud_source_object
Get the source object.
199 @property 200 def destination_id(self) -> str: 201 """The ID of the destination.""" 202 if not self._destination_id: 203 if not self._connection_info: 204 self._connection_info = self._fetch_connection_info() 205 206 self._destination_id = self._connection_info.destination_id 207 208 return self._destination_id
The ID of the destination.
210 @property 211 def destination(self) -> CloudDestination: 212 """Get the destination object.""" 213 if self._cloud_destination_object: 214 return self._cloud_destination_object 215 216 self._cloud_destination_object = CloudDestination( 217 workspace=self.workspace, 218 connector_id=self.destination_id, 219 ) 220 return self._cloud_destination_object
Get the destination object.
222 @property 223 def stream_names(self) -> list[str]: 224 """The stream names.""" 225 if not self._connection_info: 226 self._connection_info = self._fetch_connection_info() 227 228 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
230 @property 231 def table_prefix(self) -> str: 232 """The table prefix.""" 233 if not self._connection_info: 234 self._connection_info = self._fetch_connection_info() 235 236 return self._connection_info.prefix or ""
The table prefix.
238 @property 239 def connection_url(self) -> str | None: 240 """The web URL to the connection.""" 241 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
243 @property 244 def job_history_url(self) -> str | None: 245 """The URL to the job history for the connection.""" 246 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
250 def run_sync( 251 self, 252 *, 253 wait: bool = True, 254 wait_timeout: int = 300, 255 ) -> SyncResult: 256 """Run a sync.""" 257 connection_response = api_util.run_connection( 258 connection_id=self.connection_id, 259 api_root=self.workspace.api_root, 260 workspace_id=self.workspace.workspace_id, 261 client_id=self.workspace.client_id, 262 client_secret=self.workspace.client_secret, 263 bearer_token=self.workspace.bearer_token, 264 ) 265 sync_result = SyncResult( 266 workspace=self.workspace, 267 connection=self, 268 job_id=connection_response.job_id, 269 ) 270 271 if wait: 272 sync_result.wait_for_completion( 273 wait_timeout=wait_timeout, 274 raise_failure=True, 275 raise_timeout=True, 276 ) 277 278 return sync_result
Run a sync.
289 def get_previous_sync_logs( 290 self, 291 *, 292 limit: int = 20, 293 offset: int | None = None, 294 from_tail: bool = True, 295 ) -> list[SyncResult]: 296 """Get previous sync jobs for a connection with pagination support. 297 298 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 299 rows_synced, start_time). Full log text can be fetched lazily via 300 `SyncResult.get_full_log_text()`. 301 302 Args: 303 limit: Maximum number of jobs to return. Defaults to 20. 304 offset: Number of jobs to skip from the beginning. Defaults to None (0). 305 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 306 If False, returns jobs ordered oldest-first (createdAt ASC). 307 Defaults to True. 308 309 Returns: 310 A list of SyncResult objects representing the sync jobs. 311 """ 312 order_by = ( 313 api_util.JOB_ORDER_BY_CREATED_AT_DESC 314 if from_tail 315 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 316 ) 317 sync_logs: list[JobResponse] = api_util.get_job_logs( 318 connection_id=self.connection_id, 319 api_root=self.workspace.api_root, 320 workspace_id=self.workspace.workspace_id, 321 limit=limit, 322 offset=offset, 323 order_by=order_by, 324 client_id=self.workspace.client_id, 325 client_secret=self.workspace.client_secret, 326 bearer_token=self.workspace.bearer_token, 327 ) 328 return [ 329 SyncResult( 330 workspace=self.workspace, 331 connection=self, 332 job_id=sync_log.job_id, 333 _latest_job_info=sync_log, 334 ) 335 for sync_log in sync_logs 336 ]
Get previous sync jobs for a connection with pagination support.
Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
rows_synced, start_time). Full log text can be fetched lazily via
SyncResult.get_full_log_text().
Arguments:
- limit: Maximum number of jobs to return. Defaults to 20.
- offset: Number of jobs to skip from the beginning. Defaults to None (0).
- from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
Returns:
A list of SyncResult objects representing the sync jobs.
338 def get_sync_result( 339 self, 340 job_id: int | None = None, 341 ) -> SyncResult | None: 342 """Get the sync result for the connection. 343 344 If `job_id` is not provided, the most recent sync job will be used. 345 346 Returns `None` if job_id is omitted and no previous jobs are found. 347 """ 348 if job_id is None: 349 # Get the most recent sync job 350 results = self.get_previous_sync_logs( 351 limit=1, 352 ) 353 if results: 354 return results[0] 355 356 return None 357 358 # Get the sync job by ID (lazy loaded) 359 return SyncResult( 360 workspace=self.workspace, 361 connection=self, 362 job_id=job_id, 363 )
Get the sync result for the connection.
If job_id is not provided, the most recent sync job will be used.
Returns None if job_id is omitted and no previous jobs are found.
367 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 368 """Get the connection state artifacts. 369 370 Returns the persisted state for this connection, which can be used 371 when debugging incremental syncs. 372 373 Uses the Config API endpoint: POST /v1/state/get 374 375 Returns: 376 List of state objects for each stream, or None if no state is set. 377 """ 378 state_response = api_util.get_connection_state( 379 connection_id=self.connection_id, 380 api_root=self.workspace.api_root, 381 client_id=self.workspace.client_id, 382 client_secret=self.workspace.client_secret, 383 bearer_token=self.workspace.bearer_token, 384 ) 385 if state_response.get("stateType") == "not_set": 386 return None 387 return state_response.get("streamState", [])
Get the connection state artifacts.
Returns the persisted state for this connection, which can be used when debugging incremental syncs.
Uses the Config API endpoint: POST /v1/state/get
Returns:
List of state objects for each stream, or None if no state is set.
389 def get_catalog_artifact(self) -> dict[str, Any] | None: 390 """Get the configured catalog for this connection. 391 392 Returns the full configured catalog (syncCatalog) for this connection, 393 including stream schemas, sync modes, cursor fields, and primary keys. 394 395 Uses the Config API endpoint: POST /v1/web_backend/connections/get 396 397 Returns: 398 Dictionary containing the configured catalog, or `None` if not found. 399 """ 400 connection_response = api_util.get_connection_catalog( 401 connection_id=self.connection_id, 402 api_root=self.workspace.api_root, 403 client_id=self.workspace.client_id, 404 client_secret=self.workspace.client_secret, 405 bearer_token=self.workspace.bearer_token, 406 ) 407 return connection_response.get("syncCatalog")
Get the configured catalog for this connection.
Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.
Uses the Config API endpoint: POST /v1/web_backend/connections/get
Returns:
Dictionary containing the configured catalog, or
Noneif not found.
409 def rename(self, name: str) -> CloudConnection: 410 """Rename the connection. 411 412 Args: 413 name: New name for the connection 414 415 Returns: 416 Updated CloudConnection object with refreshed info 417 """ 418 updated_response = api_util.patch_connection( 419 connection_id=self.connection_id, 420 api_root=self.workspace.api_root, 421 client_id=self.workspace.client_id, 422 client_secret=self.workspace.client_secret, 423 bearer_token=self.workspace.bearer_token, 424 name=name, 425 ) 426 self._connection_info = updated_response 427 return self
Rename the connection.
Arguments:
- name: New name for the connection
Returns:
Updated CloudConnection object with refreshed info
429 def set_table_prefix(self, prefix: str) -> CloudConnection: 430 """Set the table prefix for the connection. 431 432 Args: 433 prefix: New table prefix to use when syncing to the destination 434 435 Returns: 436 Updated CloudConnection object with refreshed info 437 """ 438 updated_response = api_util.patch_connection( 439 connection_id=self.connection_id, 440 api_root=self.workspace.api_root, 441 client_id=self.workspace.client_id, 442 client_secret=self.workspace.client_secret, 443 bearer_token=self.workspace.bearer_token, 444 prefix=prefix, 445 ) 446 self._connection_info = updated_response 447 return self
Set the table prefix for the connection.
Arguments:
- prefix: New table prefix to use when syncing to the destination
Returns:
Updated CloudConnection object with refreshed info
449 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 450 """Set the selected streams for the connection. 451 452 This is a destructive operation that can break existing connections if the 453 stream selection is changed incorrectly. Use with caution. 454 455 Args: 456 stream_names: List of stream names to sync 457 458 Returns: 459 Updated CloudConnection object with refreshed info 460 """ 461 configurations = api_util.build_stream_configurations(stream_names) 462 463 updated_response = api_util.patch_connection( 464 connection_id=self.connection_id, 465 api_root=self.workspace.api_root, 466 client_id=self.workspace.client_id, 467 client_secret=self.workspace.client_secret, 468 bearer_token=self.workspace.bearer_token, 469 configurations=configurations, 470 ) 471 self._connection_info = updated_response 472 return self
Set the selected streams for the connection.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
Arguments:
- stream_names: List of stream names to sync
Returns:
Updated CloudConnection object with refreshed info
476 @property 477 def enabled(self) -> bool: 478 """Get the current enabled status of the connection. 479 480 This property always fetches fresh data from the API to ensure accuracy, 481 as another process or user may have toggled the setting. 482 483 Returns: 484 True if the connection status is 'active', False otherwise. 485 """ 486 connection_info = self._fetch_connection_info(force_refresh=True) 487 return connection_info.status == api_util.models.ConnectionStatusEnum.ACTIVE
Get the current enabled status of the connection.
This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.
Returns:
True if the connection status is 'active', False otherwise.
499 def set_enabled( 500 self, 501 *, 502 enabled: bool, 503 ignore_noop: bool = True, 504 ) -> None: 505 """Set the enabled status of the connection. 506 507 Args: 508 enabled: True to enable (set status to 'active'), False to disable 509 (set status to 'inactive'). 510 ignore_noop: If True (default), silently return if the connection is already 511 in the requested state. If False, raise ValueError when the requested 512 state matches the current state. 513 514 Raises: 515 ValueError: If ignore_noop is False and the connection is already in the 516 requested state. 517 """ 518 # Always fetch fresh data to check current status 519 connection_info = self._fetch_connection_info(force_refresh=True) 520 current_status = connection_info.status 521 desired_status = ( 522 api_util.models.ConnectionStatusEnum.ACTIVE 523 if enabled 524 else api_util.models.ConnectionStatusEnum.INACTIVE 525 ) 526 527 if current_status == desired_status: 528 if ignore_noop: 529 return 530 raise ValueError( 531 f"Connection is already {'enabled' if enabled else 'disabled'}. " 532 f"Current status: {current_status}" 533 ) 534 535 updated_response = api_util.patch_connection( 536 connection_id=self.connection_id, 537 api_root=self.workspace.api_root, 538 client_id=self.workspace.client_id, 539 client_secret=self.workspace.client_secret, 540 bearer_token=self.workspace.bearer_token, 541 status=desired_status, 542 ) 543 self._connection_info = updated_response
Set the enabled status of the connection.
Arguments:
- enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
- ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
- ValueError: If ignore_noop is False and the connection is already in the requested state.
547 def set_schedule( 548 self, 549 cron_expression: str, 550 ) -> None: 551 """Set a cron schedule for the connection. 552 553 Args: 554 cron_expression: A cron expression defining when syncs should run. 555 556 Examples: 557 - "0 0 * * *" - Daily at midnight UTC 558 - "0 */6 * * *" - Every 6 hours 559 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 560 """ 561 schedule = api_util.models.AirbyteAPIConnectionSchedule( 562 schedule_type=api_util.models.ScheduleTypeEnum.CRON, 563 cron_expression=cron_expression, 564 ) 565 updated_response = api_util.patch_connection( 566 connection_id=self.connection_id, 567 api_root=self.workspace.api_root, 568 client_id=self.workspace.client_id, 569 client_secret=self.workspace.client_secret, 570 bearer_token=self.workspace.bearer_token, 571 schedule=schedule, 572 ) 573 self._connection_info = updated_response
Set a cron schedule for the connection.
Arguments:
- cron_expression: A cron expression defining when syncs should run.
Examples:
- "0 0 * * *" - Daily at midnight UTC
- "0 */6 * * *" - Every 6 hours
- "0 0 * * 0" - Weekly on Sunday at midnight UTC
575 def set_manual_schedule(self) -> None: 576 """Set the connection to manual scheduling. 577 578 Disables automatic syncs. Syncs will only run when manually triggered. 579 """ 580 schedule = api_util.models.AirbyteAPIConnectionSchedule( 581 schedule_type=api_util.models.ScheduleTypeEnum.MANUAL, 582 ) 583 updated_response = api_util.patch_connection( 584 connection_id=self.connection_id, 585 api_root=self.workspace.api_root, 586 client_id=self.workspace.client_id, 587 client_secret=self.workspace.client_secret, 588 bearer_token=self.workspace.bearer_token, 589 schedule=schedule, 590 ) 591 self._connection_info = updated_response
Set the connection to manual scheduling.
Disables automatic syncs. Syncs will only run when manually triggered.
595 def permanently_delete( 596 self, 597 *, 598 cascade_delete_source: bool = False, 599 cascade_delete_destination: bool = False, 600 ) -> None: 601 """Delete the connection. 602 603 Args: 604 cascade_delete_source: Whether to also delete the source. 605 cascade_delete_destination: Whether to also delete the destination. 606 """ 607 self.workspace.permanently_delete_connection(self) 608 609 if cascade_delete_source: 610 self.workspace.permanently_delete_source(self.source_id) 611 612 if cascade_delete_destination: 613 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.