airbyte.cloud.connections      
                        Cloud Connections.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Cloud Connections.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING 7 8from airbyte._util import api_util 9from airbyte.cloud.connectors import CloudDestination, CloudSource 10from airbyte.cloud.sync_results import SyncResult 11 12 13if TYPE_CHECKING: 14 from airbyte_api.models import ConnectionResponse, JobResponse 15 16 from airbyte.cloud.workspaces import CloudWorkspace 17 18 19class CloudConnection: 20 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 21 22 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 23 """ 24 25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None 49 """The connection info object. (Cached.)""" 50 51 self._cloud_source_object: CloudSource | None = None 52 """The source object. (Cached.)""" 53 54 self._cloud_destination_object: CloudDestination | None = None 55 """The destination object. (Cached.)""" 56 57 def _fetch_connection_info(self) -> ConnectionResponse: 58 """Populate the connection with data from the API.""" 59 return api_util.get_connection( 60 workspace_id=self.workspace.workspace_id, 61 connection_id=self.connection_id, 62 api_root=self.workspace.api_root, 63 client_id=self.workspace.client_id, 64 client_secret=self.workspace.client_secret, 65 ) 66 67 @classmethod 68 def _from_connection_response( 69 cls, 70 workspace: CloudWorkspace, 71 connection_response: ConnectionResponse, 72 ) -> CloudConnection: 73 """Create a CloudConnection from a ConnectionResponse.""" 74 result = cls( 75 workspace=workspace, 76 connection_id=connection_response.connection_id, 77 source=connection_response.source_id, 78 destination=connection_response.destination_id, 79 ) 80 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 81 return result 82 83 # Properties 84 85 @property 86 def name(self) -> str | None: 87 """Get the display name of the connection, if available. 88 89 E.g. "My Postgres to Snowflake", not the connection ID. 90 """ 91 if not self._connection_info: 92 self._connection_info = self._fetch_connection_info() 93 94 return self._connection_info.name 95 96 @property 97 def source_id(self) -> str: 98 """The ID of the source.""" 99 if not self._source_id: 100 if not self._connection_info: 101 self._connection_info = self._fetch_connection_info() 102 103 self._source_id = self._connection_info.source_id 104 105 return self._source_id 106 107 @property 108 def source(self) -> CloudSource: 109 """Get the source object.""" 110 if self._cloud_source_object: 111 return self._cloud_source_object 112 113 self._cloud_source_object = CloudSource( 114 workspace=self.workspace, 115 connector_id=self.source_id, 116 ) 117 return self._cloud_source_object 118 119 @property 120 def destination_id(self) -> str: 121 """The ID of the destination.""" 122 if not self._destination_id: 123 if not self._connection_info: 124 self._connection_info = self._fetch_connection_info() 125 126 self._destination_id = self._connection_info.source_id 127 128 return self._destination_id 129 130 @property 131 def destination(self) -> CloudDestination: 132 """Get the destination object.""" 133 if self._cloud_destination_object: 134 return self._cloud_destination_object 135 136 self._cloud_destination_object = CloudDestination( 137 workspace=self.workspace, 138 connector_id=self.destination_id, 139 ) 140 return self._cloud_destination_object 141 142 @property 143 def stream_names(self) -> list[str]: 144 """The stream names.""" 145 if not self._connection_info: 146 self._connection_info = self._fetch_connection_info() 147 148 return [stream.name for stream in self._connection_info.configurations.streams or []] 149 150 @property 151 def table_prefix(self) -> str: 152 """The table prefix.""" 153 if not self._connection_info: 154 self._connection_info = self._fetch_connection_info() 155 156 return self._connection_info.prefix or "" 157 158 @property 159 def connection_url(self) -> str | None: 160 """The web URL to the connection.""" 161 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 162 163 @property 164 def job_history_url(self) -> str | None: 165 """The URL to the job history for the connection.""" 166 return f"{self.connection_url}/timeline" 167 168 # Run Sync 169 170 def run_sync( 171 self, 172 *, 173 wait: bool = True, 174 wait_timeout: int = 300, 175 ) -> SyncResult: 176 """Run a sync.""" 177 connection_response = api_util.run_connection( 178 connection_id=self.connection_id, 179 api_root=self.workspace.api_root, 180 workspace_id=self.workspace.workspace_id, 181 client_id=self.workspace.client_id, 182 client_secret=self.workspace.client_secret, 183 ) 184 sync_result = SyncResult( 185 workspace=self.workspace, 186 connection=self, 187 job_id=connection_response.job_id, 188 ) 189 190 if wait: 191 sync_result.wait_for_completion( 192 wait_timeout=wait_timeout, 193 raise_failure=True, 194 raise_timeout=True, 195 ) 196 197 return sync_result 198 199 def __repr__(self) -> str: 200 """String representation of the connection.""" 201 return ( 202 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 203 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 204 ) 205 206 # Logs 207 208 def get_previous_sync_logs( 209 self, 210 *, 211 limit: int = 10, 212 ) -> list[SyncResult]: 213 """Get the previous sync logs for a connection.""" 214 sync_logs: list[JobResponse] = api_util.get_job_logs( 215 connection_id=self.connection_id, 216 api_root=self.workspace.api_root, 217 workspace_id=self.workspace.workspace_id, 218 limit=limit, 219 client_id=self.workspace.client_id, 220 client_secret=self.workspace.client_secret, 221 ) 222 return [ 223 SyncResult( 224 workspace=self.workspace, 225 connection=self, 226 job_id=sync_log.job_id, 227 _latest_job_info=sync_log, 228 ) 229 for sync_log in sync_logs 230 ] 231 232 def get_sync_result( 233 self, 234 job_id: int | None = None, 235 ) -> SyncResult | None: 236 """Get the sync result for the connection. 237 238 If `job_id` is not provided, the most recent sync job will be used. 239 240 Returns `None` if job_id is omitted and no previous jobs are found. 241 """ 242 if job_id is None: 243 # Get the most recent sync job 244 results = self.get_previous_sync_logs( 245 limit=1, 246 ) 247 if results: 248 return results[0] 249 250 return None 251 252 # Get the sync job by ID (lazy loaded) 253 return SyncResult( 254 workspace=self.workspace, 255 connection=self, 256 job_id=job_id, 257 ) 258 259 def rename(self, name: str) -> CloudConnection: 260 """Rename the connection. 261 262 Args: 263 name: New name for the connection 264 265 Returns: 266 Updated CloudConnection object with refreshed info 267 """ 268 updated_response = api_util.patch_connection( 269 connection_id=self.connection_id, 270 api_root=self.workspace.api_root, 271 client_id=self.workspace.client_id, 272 client_secret=self.workspace.client_secret, 273 name=name, 274 ) 275 self._connection_info = updated_response 276 return self 277 278 def set_table_prefix(self, prefix: str) -> CloudConnection: 279 """Set the table prefix for the connection. 280 281 Args: 282 prefix: New table prefix to use when syncing to the destination 283 284 Returns: 285 Updated CloudConnection object with refreshed info 286 """ 287 updated_response = api_util.patch_connection( 288 connection_id=self.connection_id, 289 api_root=self.workspace.api_root, 290 client_id=self.workspace.client_id, 291 client_secret=self.workspace.client_secret, 292 prefix=prefix, 293 ) 294 self._connection_info = updated_response 295 return self 296 297 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 298 """Set the selected streams for the connection. 299 300 This is a destructive operation that can break existing connections if the 301 stream selection is changed incorrectly. Use with caution. 302 303 Args: 304 stream_names: List of stream names to sync 305 306 Returns: 307 Updated CloudConnection object with refreshed info 308 """ 309 configurations = api_util.build_stream_configurations(stream_names) 310 311 updated_response = api_util.patch_connection( 312 connection_id=self.connection_id, 313 api_root=self.workspace.api_root, 314 client_id=self.workspace.client_id, 315 client_secret=self.workspace.client_secret, 316 configurations=configurations, 317 ) 318 self._connection_info = updated_response 319 return self 320 321 # Deletions 322 323 def permanently_delete( 324 self, 325 *, 326 cascade_delete_source: bool = False, 327 cascade_delete_destination: bool = False, 328 ) -> None: 329 """Delete the connection. 330 331 Args: 332 cascade_delete_source: Whether to also delete the source. 333 cascade_delete_destination: Whether to also delete the destination. 334 """ 335 self.workspace.permanently_delete_connection(self) 336 337 if cascade_delete_source: 338 self.workspace.permanently_delete_source(self.source_id) 339 340 if cascade_delete_destination: 341 self.workspace.permanently_delete_destination(self.destination_id)
20class CloudConnection: 21 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 22 23 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 24 """ 25 26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)""" 57 58 def _fetch_connection_info(self) -> ConnectionResponse: 59 """Populate the connection with data from the API.""" 60 return api_util.get_connection( 61 workspace_id=self.workspace.workspace_id, 62 connection_id=self.connection_id, 63 api_root=self.workspace.api_root, 64 client_id=self.workspace.client_id, 65 client_secret=self.workspace.client_secret, 66 ) 67 68 @classmethod 69 def _from_connection_response( 70 cls, 71 workspace: CloudWorkspace, 72 connection_response: ConnectionResponse, 73 ) -> CloudConnection: 74 """Create a CloudConnection from a ConnectionResponse.""" 75 result = cls( 76 workspace=workspace, 77 connection_id=connection_response.connection_id, 78 source=connection_response.source_id, 79 destination=connection_response.destination_id, 80 ) 81 result._connection_info = connection_response # noqa: SLF001 # Accessing Non-Public API 82 return result 83 84 # Properties 85 86 @property 87 def name(self) -> str | None: 88 """Get the display name of the connection, if available. 89 90 E.g. "My Postgres to Snowflake", not the connection ID. 91 """ 92 if not self._connection_info: 93 self._connection_info = self._fetch_connection_info() 94 95 return self._connection_info.name 96 97 @property 98 def source_id(self) -> str: 99 """The ID of the source.""" 100 if not self._source_id: 101 if not self._connection_info: 102 self._connection_info = self._fetch_connection_info() 103 104 self._source_id = self._connection_info.source_id 105 106 return self._source_id 107 108 @property 109 def source(self) -> CloudSource: 110 """Get the source object.""" 111 if self._cloud_source_object: 112 return self._cloud_source_object 113 114 self._cloud_source_object = CloudSource( 115 workspace=self.workspace, 116 connector_id=self.source_id, 117 ) 118 return self._cloud_source_object 119 120 @property 121 def destination_id(self) -> str: 122 """The ID of the destination.""" 123 if not self._destination_id: 124 if not self._connection_info: 125 self._connection_info = self._fetch_connection_info() 126 127 self._destination_id = self._connection_info.source_id 128 129 return self._destination_id 130 131 @property 132 def destination(self) -> CloudDestination: 133 """Get the destination object.""" 134 if self._cloud_destination_object: 135 return self._cloud_destination_object 136 137 self._cloud_destination_object = CloudDestination( 138 workspace=self.workspace, 139 connector_id=self.destination_id, 140 ) 141 return self._cloud_destination_object 142 143 @property 144 def stream_names(self) -> list[str]: 145 """The stream names.""" 146 if not self._connection_info: 147 self._connection_info = self._fetch_connection_info() 148 149 return [stream.name for stream in self._connection_info.configurations.streams or []] 150 151 @property 152 def table_prefix(self) -> str: 153 """The table prefix.""" 154 if not self._connection_info: 155 self._connection_info = self._fetch_connection_info() 156 157 return self._connection_info.prefix or "" 158 159 @property 160 def connection_url(self) -> str | None: 161 """The web URL to the connection.""" 162 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 163 164 @property 165 def job_history_url(self) -> str | None: 166 """The URL to the job history for the connection.""" 167 return f"{self.connection_url}/timeline" 168 169 # Run Sync 170 171 def run_sync( 172 self, 173 *, 174 wait: bool = True, 175 wait_timeout: int = 300, 176 ) -> SyncResult: 177 """Run a sync.""" 178 connection_response = api_util.run_connection( 179 connection_id=self.connection_id, 180 api_root=self.workspace.api_root, 181 workspace_id=self.workspace.workspace_id, 182 client_id=self.workspace.client_id, 183 client_secret=self.workspace.client_secret, 184 ) 185 sync_result = SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=connection_response.job_id, 189 ) 190 191 if wait: 192 sync_result.wait_for_completion( 193 wait_timeout=wait_timeout, 194 raise_failure=True, 195 raise_timeout=True, 196 ) 197 198 return sync_result 199 200 def __repr__(self) -> str: 201 """String representation of the connection.""" 202 return ( 203 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 204 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 205 ) 206 207 # Logs 208 209 def get_previous_sync_logs( 210 self, 211 *, 212 limit: int = 10, 213 ) -> list[SyncResult]: 214 """Get the previous sync logs for a connection.""" 215 sync_logs: list[JobResponse] = api_util.get_job_logs( 216 connection_id=self.connection_id, 217 api_root=self.workspace.api_root, 218 workspace_id=self.workspace.workspace_id, 219 limit=limit, 220 client_id=self.workspace.client_id, 221 client_secret=self.workspace.client_secret, 222 ) 223 return [ 224 SyncResult( 225 workspace=self.workspace, 226 connection=self, 227 job_id=sync_log.job_id, 228 _latest_job_info=sync_log, 229 ) 230 for sync_log in sync_logs 231 ] 232 233 def get_sync_result( 234 self, 235 job_id: int | None = None, 236 ) -> SyncResult | None: 237 """Get the sync result for the connection. 238 239 If `job_id` is not provided, the most recent sync job will be used. 240 241 Returns `None` if job_id is omitted and no previous jobs are found. 242 """ 243 if job_id is None: 244 # Get the most recent sync job 245 results = self.get_previous_sync_logs( 246 limit=1, 247 ) 248 if results: 249 return results[0] 250 251 return None 252 253 # Get the sync job by ID (lazy loaded) 254 return SyncResult( 255 workspace=self.workspace, 256 connection=self, 257 job_id=job_id, 258 ) 259 260 def rename(self, name: str) -> CloudConnection: 261 """Rename the connection. 262 263 Args: 264 name: New name for the connection 265 266 Returns: 267 Updated CloudConnection object with refreshed info 268 """ 269 updated_response = api_util.patch_connection( 270 connection_id=self.connection_id, 271 api_root=self.workspace.api_root, 272 client_id=self.workspace.client_id, 273 client_secret=self.workspace.client_secret, 274 name=name, 275 ) 276 self._connection_info = updated_response 277 return self 278 279 def set_table_prefix(self, prefix: str) -> CloudConnection: 280 """Set the table prefix for the connection. 281 282 Args: 283 prefix: New table prefix to use when syncing to the destination 284 285 Returns: 286 Updated CloudConnection object with refreshed info 287 """ 288 updated_response = api_util.patch_connection( 289 connection_id=self.connection_id, 290 api_root=self.workspace.api_root, 291 client_id=self.workspace.client_id, 292 client_secret=self.workspace.client_secret, 293 prefix=prefix, 294 ) 295 self._connection_info = updated_response 296 return self 297 298 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 299 """Set the selected streams for the connection. 300 301 This is a destructive operation that can break existing connections if the 302 stream selection is changed incorrectly. Use with caution. 303 304 Args: 305 stream_names: List of stream names to sync 306 307 Returns: 308 Updated CloudConnection object with refreshed info 309 """ 310 configurations = api_util.build_stream_configurations(stream_names) 311 312 updated_response = api_util.patch_connection( 313 connection_id=self.connection_id, 314 api_root=self.workspace.api_root, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 configurations=configurations, 318 ) 319 self._connection_info = updated_response 320 return self 321 322 # Deletions 323 324 def permanently_delete( 325 self, 326 *, 327 cascade_delete_source: bool = False, 328 cascade_delete_destination: bool = False, 329 ) -> None: 330 """Delete the connection. 331 332 Args: 333 cascade_delete_source: Whether to also delete the source. 334 cascade_delete_destination: Whether to also delete the destination. 335 """ 336 self.workspace.permanently_delete_connection(self) 337 338 if cascade_delete_source: 339 self.workspace.permanently_delete_source(self.source_id) 340 341 if cascade_delete_destination: 342 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection object directly.
Instead, use CloudWorkspace.get_connection() to create a connection object.
86 @property 87 def name(self) -> str | None: 88 """Get the display name of the connection, if available. 89 90 E.g. "My Postgres to Snowflake", not the connection ID. 91 """ 92 if not self._connection_info: 93 self._connection_info = self._fetch_connection_info() 94 95 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
97 @property 98 def source_id(self) -> str: 99 """The ID of the source.""" 100 if not self._source_id: 101 if not self._connection_info: 102 self._connection_info = self._fetch_connection_info() 103 104 self._source_id = self._connection_info.source_id 105 106 return self._source_id
The ID of the source.
108 @property 109 def source(self) -> CloudSource: 110 """Get the source object.""" 111 if self._cloud_source_object: 112 return self._cloud_source_object 113 114 self._cloud_source_object = CloudSource( 115 workspace=self.workspace, 116 connector_id=self.source_id, 117 ) 118 return self._cloud_source_object
Get the source object.
120 @property 121 def destination_id(self) -> str: 122 """The ID of the destination.""" 123 if not self._destination_id: 124 if not self._connection_info: 125 self._connection_info = self._fetch_connection_info() 126 127 self._destination_id = self._connection_info.source_id 128 129 return self._destination_id
The ID of the destination.
131 @property 132 def destination(self) -> CloudDestination: 133 """Get the destination object.""" 134 if self._cloud_destination_object: 135 return self._cloud_destination_object 136 137 self._cloud_destination_object = CloudDestination( 138 workspace=self.workspace, 139 connector_id=self.destination_id, 140 ) 141 return self._cloud_destination_object
Get the destination object.
143 @property 144 def stream_names(self) -> list[str]: 145 """The stream names.""" 146 if not self._connection_info: 147 self._connection_info = self._fetch_connection_info() 148 149 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
151 @property 152 def table_prefix(self) -> str: 153 """The table prefix.""" 154 if not self._connection_info: 155 self._connection_info = self._fetch_connection_info() 156 157 return self._connection_info.prefix or ""
The table prefix.
159 @property 160 def connection_url(self) -> str | None: 161 """The web URL to the connection.""" 162 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
164 @property 165 def job_history_url(self) -> str | None: 166 """The URL to the job history for the connection.""" 167 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
171 def run_sync( 172 self, 173 *, 174 wait: bool = True, 175 wait_timeout: int = 300, 176 ) -> SyncResult: 177 """Run a sync.""" 178 connection_response = api_util.run_connection( 179 connection_id=self.connection_id, 180 api_root=self.workspace.api_root, 181 workspace_id=self.workspace.workspace_id, 182 client_id=self.workspace.client_id, 183 client_secret=self.workspace.client_secret, 184 ) 185 sync_result = SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=connection_response.job_id, 189 ) 190 191 if wait: 192 sync_result.wait_for_completion( 193 wait_timeout=wait_timeout, 194 raise_failure=True, 195 raise_timeout=True, 196 ) 197 198 return sync_result
Run a sync.
209 def get_previous_sync_logs( 210 self, 211 *, 212 limit: int = 10, 213 ) -> list[SyncResult]: 214 """Get the previous sync logs for a connection.""" 215 sync_logs: list[JobResponse] = api_util.get_job_logs( 216 connection_id=self.connection_id, 217 api_root=self.workspace.api_root, 218 workspace_id=self.workspace.workspace_id, 219 limit=limit, 220 client_id=self.workspace.client_id, 221 client_secret=self.workspace.client_secret, 222 ) 223 return [ 224 SyncResult( 225 workspace=self.workspace, 226 connection=self, 227 job_id=sync_log.job_id, 228 _latest_job_info=sync_log, 229 ) 230 for sync_log in sync_logs 231 ]
Get the previous sync logs for a connection.
233 def get_sync_result( 234 self, 235 job_id: int | None = None, 236 ) -> SyncResult | None: 237 """Get the sync result for the connection. 238 239 If `job_id` is not provided, the most recent sync job will be used. 240 241 Returns `None` if job_id is omitted and no previous jobs are found. 242 """ 243 if job_id is None: 244 # Get the most recent sync job 245 results = self.get_previous_sync_logs( 246 limit=1, 247 ) 248 if results: 249 return results[0] 250 251 return None 252 253 # Get the sync job by ID (lazy loaded) 254 return SyncResult( 255 workspace=self.workspace, 256 connection=self, 257 job_id=job_id, 258 )
Get the sync result for the connection.
If job_id is not provided, the most recent sync job will be used.
Returns None if job_id is omitted and no previous jobs are found.
260 def rename(self, name: str) -> CloudConnection: 261 """Rename the connection. 262 263 Args: 264 name: New name for the connection 265 266 Returns: 267 Updated CloudConnection object with refreshed info 268 """ 269 updated_response = api_util.patch_connection( 270 connection_id=self.connection_id, 271 api_root=self.workspace.api_root, 272 client_id=self.workspace.client_id, 273 client_secret=self.workspace.client_secret, 274 name=name, 275 ) 276 self._connection_info = updated_response 277 return self
Rename the connection.
Arguments:
- name: New name for the connection
 
Returns:
Updated CloudConnection object with refreshed info
279 def set_table_prefix(self, prefix: str) -> CloudConnection: 280 """Set the table prefix for the connection. 281 282 Args: 283 prefix: New table prefix to use when syncing to the destination 284 285 Returns: 286 Updated CloudConnection object with refreshed info 287 """ 288 updated_response = api_util.patch_connection( 289 connection_id=self.connection_id, 290 api_root=self.workspace.api_root, 291 client_id=self.workspace.client_id, 292 client_secret=self.workspace.client_secret, 293 prefix=prefix, 294 ) 295 self._connection_info = updated_response 296 return self
Set the table prefix for the connection.
Arguments:
- prefix: New table prefix to use when syncing to the destination
 
Returns:
Updated CloudConnection object with refreshed info
298 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 299 """Set the selected streams for the connection. 300 301 This is a destructive operation that can break existing connections if the 302 stream selection is changed incorrectly. Use with caution. 303 304 Args: 305 stream_names: List of stream names to sync 306 307 Returns: 308 Updated CloudConnection object with refreshed info 309 """ 310 configurations = api_util.build_stream_configurations(stream_names) 311 312 updated_response = api_util.patch_connection( 313 connection_id=self.connection_id, 314 api_root=self.workspace.api_root, 315 client_id=self.workspace.client_id, 316 client_secret=self.workspace.client_secret, 317 configurations=configurations, 318 ) 319 self._connection_info = updated_response 320 return self
Set the selected streams for the connection.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
Arguments:
- stream_names: List of stream names to sync
 
Returns:
Updated CloudConnection object with refreshed info
324 def permanently_delete( 325 self, 326 *, 327 cascade_delete_source: bool = False, 328 cascade_delete_destination: bool = False, 329 ) -> None: 330 """Delete the connection. 331 332 Args: 333 cascade_delete_source: Whether to also delete the source. 334 cascade_delete_destination: Whether to also delete the destination. 335 """ 336 self.workspace.permanently_delete_connection(self) 337 338 if cascade_delete_source: 339 self.workspace.permanently_delete_source(self.source_id) 340 341 if cascade_delete_destination: 342 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
 - cascade_delete_destination: Whether to also delete the destination.