airbyte.cloud.connections
Cloud Connections.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Cloud Connections.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING, cast 7 8from airbyte._util import api_util 9from airbyte.cloud.connectors import CloudDestination, CloudSource 10from airbyte.cloud.sync_results import SyncResult 11 12 13if TYPE_CHECKING: 14 from airbyte_api.models import ConnectionResponse, JobResponse 15 16 from airbyte.cloud.workspaces import CloudWorkspace 17 18 19class CloudConnection: 20 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 21 22 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 23 """ 24 25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None 49 """The connection info object. (Cached.)""" 50 51 self._cloud_source_object: CloudSource | None = None 52 """The source object. (Cached.)""" 53 54 self._cloud_destination_object: CloudDestination | None = None 55 """The destination object. (Cached.)""" 56 57 def _fetch_connection_info(self) -> ConnectionResponse: 58 """Populate the connection with data from the API.""" 59 return api_util.get_connection( 60 workspace_id=self.workspace.workspace_id, 61 connection_id=self.connection_id, 62 api_root=self.workspace.api_root, 63 client_id=self.workspace.client_id, 64 client_secret=self.workspace.client_secret, 65 ) 66 67 # Properties 68 69 @property 70 def source_id(self) -> str: 71 """The ID of the source.""" 72 if not self._source_id: 73 if not self._connection_info: 74 self._connection_info = self._fetch_connection_info() 75 76 self._source_id = self._connection_info.source_id 77 78 return cast("str", self._source_id) 79 80 @property 81 def source(self) -> CloudSource: 82 """Get the source object.""" 83 if self._cloud_source_object: 84 return self._cloud_source_object 85 86 self._cloud_source_object = CloudSource( 87 workspace=self.workspace, 88 connector_id=self.source_id, 89 ) 90 return self._cloud_source_object 91 92 @property 93 def destination_id(self) -> str: 94 """The ID of the destination.""" 95 if not self._destination_id: 96 if not self._connection_info: 97 self._connection_info = self._fetch_connection_info() 98 99 self._destination_id = self._connection_info.source_id 100 101 return cast("str", self._destination_id) 102 103 @property 104 def destination(self) -> CloudDestination: 105 """Get the destination object.""" 106 if self._cloud_destination_object: 107 return self._cloud_destination_object 108 109 self._cloud_destination_object = CloudDestination( 110 workspace=self.workspace, 111 connector_id=self.destination_id, 112 ) 113 return self._cloud_destination_object 114 115 @property 116 def stream_names(self) -> list[str]: 117 """The stream names.""" 118 if not self._connection_info: 119 self._connection_info = self._fetch_connection_info() 120 121 return [stream.name for stream in self._connection_info.configurations.streams or []] 122 123 @property 124 def table_prefix(self) -> str: 125 """The table prefix.""" 126 if not self._connection_info: 127 self._connection_info = self._fetch_connection_info() 128 129 return self._connection_info.prefix or "" 130 131 @property 132 def connection_url(self) -> str | None: 133 """The web URL to the connection.""" 134 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 135 136 @property 137 def job_history_url(self) -> str | None: 138 """The URL to the job history for the connection.""" 139 return f"{self.connection_url}/timeline" 140 141 # Run Sync 142 143 def run_sync( 144 self, 145 *, 146 wait: bool = True, 147 wait_timeout: int = 300, 148 ) -> SyncResult: 149 """Run a sync.""" 150 connection_response = api_util.run_connection( 151 connection_id=self.connection_id, 152 api_root=self.workspace.api_root, 153 workspace_id=self.workspace.workspace_id, 154 client_id=self.workspace.client_id, 155 client_secret=self.workspace.client_secret, 156 ) 157 sync_result = SyncResult( 158 workspace=self.workspace, 159 connection=self, 160 job_id=connection_response.job_id, 161 ) 162 163 if wait: 164 sync_result.wait_for_completion( 165 wait_timeout=wait_timeout, 166 raise_failure=True, 167 raise_timeout=True, 168 ) 169 170 return sync_result 171 172 def __repr__(self) -> str: 173 """String representation of the connection.""" 174 return ( 175 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 176 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 177 ) 178 179 # Logs 180 181 def get_previous_sync_logs( 182 self, 183 *, 184 limit: int = 10, 185 ) -> list[SyncResult]: 186 """Get the previous sync logs for a connection.""" 187 sync_logs: list[JobResponse] = api_util.get_job_logs( 188 connection_id=self.connection_id, 189 api_root=self.workspace.api_root, 190 workspace_id=self.workspace.workspace_id, 191 limit=limit, 192 client_id=self.workspace.client_id, 193 client_secret=self.workspace.client_secret, 194 ) 195 return [ 196 SyncResult( 197 workspace=self.workspace, 198 connection=self, 199 job_id=sync_log.job_id, 200 _latest_job_info=sync_log, 201 ) 202 for sync_log in sync_logs 203 ] 204 205 def get_sync_result( 206 self, 207 job_id: int | None = None, 208 ) -> SyncResult | None: 209 """Get the sync result for the connection. 210 211 If `job_id` is not provided, the most recent sync job will be used. 212 213 Returns `None` if job_id is omitted and no previous jobs are found. 214 """ 215 if job_id is None: 216 # Get the most recent sync job 217 results = self.get_previous_sync_logs( 218 limit=1, 219 ) 220 if results: 221 return results[0] 222 223 return None 224 225 # Get the sync job by ID (lazy loaded) 226 return SyncResult( 227 workspace=self.workspace, 228 connection=self, 229 job_id=job_id, 230 ) 231 232 # Deletions 233 234 def permanently_delete( 235 self, 236 *, 237 cascade_delete_source: bool = False, 238 cascade_delete_destination: bool = False, 239 ) -> None: 240 """Delete the connection. 241 242 Args: 243 cascade_delete_source: Whether to also delete the source. 244 cascade_delete_destination: Whether to also delete the destination. 245 """ 246 self.workspace.permanently_delete_connection(self) 247 248 if cascade_delete_source: 249 self.workspace.permanently_delete_source(self.source_id) 250 251 if cascade_delete_destination: 252 self.workspace.permanently_delete_destination(self.destination_id)
20class CloudConnection: 21 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 22 23 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 24 """ 25 26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)""" 57 58 def _fetch_connection_info(self) -> ConnectionResponse: 59 """Populate the connection with data from the API.""" 60 return api_util.get_connection( 61 workspace_id=self.workspace.workspace_id, 62 connection_id=self.connection_id, 63 api_root=self.workspace.api_root, 64 client_id=self.workspace.client_id, 65 client_secret=self.workspace.client_secret, 66 ) 67 68 # Properties 69 70 @property 71 def source_id(self) -> str: 72 """The ID of the source.""" 73 if not self._source_id: 74 if not self._connection_info: 75 self._connection_info = self._fetch_connection_info() 76 77 self._source_id = self._connection_info.source_id 78 79 return cast("str", self._source_id) 80 81 @property 82 def source(self) -> CloudSource: 83 """Get the source object.""" 84 if self._cloud_source_object: 85 return self._cloud_source_object 86 87 self._cloud_source_object = CloudSource( 88 workspace=self.workspace, 89 connector_id=self.source_id, 90 ) 91 return self._cloud_source_object 92 93 @property 94 def destination_id(self) -> str: 95 """The ID of the destination.""" 96 if not self._destination_id: 97 if not self._connection_info: 98 self._connection_info = self._fetch_connection_info() 99 100 self._destination_id = self._connection_info.source_id 101 102 return cast("str", self._destination_id) 103 104 @property 105 def destination(self) -> CloudDestination: 106 """Get the destination object.""" 107 if self._cloud_destination_object: 108 return self._cloud_destination_object 109 110 self._cloud_destination_object = CloudDestination( 111 workspace=self.workspace, 112 connector_id=self.destination_id, 113 ) 114 return self._cloud_destination_object 115 116 @property 117 def stream_names(self) -> list[str]: 118 """The stream names.""" 119 if not self._connection_info: 120 self._connection_info = self._fetch_connection_info() 121 122 return [stream.name for stream in self._connection_info.configurations.streams or []] 123 124 @property 125 def table_prefix(self) -> str: 126 """The table prefix.""" 127 if not self._connection_info: 128 self._connection_info = self._fetch_connection_info() 129 130 return self._connection_info.prefix or "" 131 132 @property 133 def connection_url(self) -> str | None: 134 """The web URL to the connection.""" 135 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 136 137 @property 138 def job_history_url(self) -> str | None: 139 """The URL to the job history for the connection.""" 140 return f"{self.connection_url}/timeline" 141 142 # Run Sync 143 144 def run_sync( 145 self, 146 *, 147 wait: bool = True, 148 wait_timeout: int = 300, 149 ) -> SyncResult: 150 """Run a sync.""" 151 connection_response = api_util.run_connection( 152 connection_id=self.connection_id, 153 api_root=self.workspace.api_root, 154 workspace_id=self.workspace.workspace_id, 155 client_id=self.workspace.client_id, 156 client_secret=self.workspace.client_secret, 157 ) 158 sync_result = SyncResult( 159 workspace=self.workspace, 160 connection=self, 161 job_id=connection_response.job_id, 162 ) 163 164 if wait: 165 sync_result.wait_for_completion( 166 wait_timeout=wait_timeout, 167 raise_failure=True, 168 raise_timeout=True, 169 ) 170 171 return sync_result 172 173 def __repr__(self) -> str: 174 """String representation of the connection.""" 175 return ( 176 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 177 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 178 ) 179 180 # Logs 181 182 def get_previous_sync_logs( 183 self, 184 *, 185 limit: int = 10, 186 ) -> list[SyncResult]: 187 """Get the previous sync logs for a connection.""" 188 sync_logs: list[JobResponse] = api_util.get_job_logs( 189 connection_id=self.connection_id, 190 api_root=self.workspace.api_root, 191 workspace_id=self.workspace.workspace_id, 192 limit=limit, 193 client_id=self.workspace.client_id, 194 client_secret=self.workspace.client_secret, 195 ) 196 return [ 197 SyncResult( 198 workspace=self.workspace, 199 connection=self, 200 job_id=sync_log.job_id, 201 _latest_job_info=sync_log, 202 ) 203 for sync_log in sync_logs 204 ] 205 206 def get_sync_result( 207 self, 208 job_id: int | None = None, 209 ) -> SyncResult | None: 210 """Get the sync result for the connection. 211 212 If `job_id` is not provided, the most recent sync job will be used. 213 214 Returns `None` if job_id is omitted and no previous jobs are found. 215 """ 216 if job_id is None: 217 # Get the most recent sync job 218 results = self.get_previous_sync_logs( 219 limit=1, 220 ) 221 if results: 222 return results[0] 223 224 return None 225 226 # Get the sync job by ID (lazy loaded) 227 return SyncResult( 228 workspace=self.workspace, 229 connection=self, 230 job_id=job_id, 231 ) 232 233 # Deletions 234 235 def permanently_delete( 236 self, 237 *, 238 cascade_delete_source: bool = False, 239 cascade_delete_destination: bool = False, 240 ) -> None: 241 """Delete the connection. 242 243 Args: 244 cascade_delete_source: Whether to also delete the source. 245 cascade_delete_destination: Whether to also delete the destination. 246 """ 247 self.workspace.permanently_delete_connection(self) 248 249 if cascade_delete_source: 250 self.workspace.permanently_delete_source(self.source_id) 251 252 if cascade_delete_destination: 253 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection
object directly.
Instead, use CloudWorkspace.get_connection()
to create a connection object.
70 @property 71 def source_id(self) -> str: 72 """The ID of the source.""" 73 if not self._source_id: 74 if not self._connection_info: 75 self._connection_info = self._fetch_connection_info() 76 77 self._source_id = self._connection_info.source_id 78 79 return cast("str", self._source_id)
The ID of the source.
81 @property 82 def source(self) -> CloudSource: 83 """Get the source object.""" 84 if self._cloud_source_object: 85 return self._cloud_source_object 86 87 self._cloud_source_object = CloudSource( 88 workspace=self.workspace, 89 connector_id=self.source_id, 90 ) 91 return self._cloud_source_object
Get the source object.
93 @property 94 def destination_id(self) -> str: 95 """The ID of the destination.""" 96 if not self._destination_id: 97 if not self._connection_info: 98 self._connection_info = self._fetch_connection_info() 99 100 self._destination_id = self._connection_info.source_id 101 102 return cast("str", self._destination_id)
The ID of the destination.
104 @property 105 def destination(self) -> CloudDestination: 106 """Get the destination object.""" 107 if self._cloud_destination_object: 108 return self._cloud_destination_object 109 110 self._cloud_destination_object = CloudDestination( 111 workspace=self.workspace, 112 connector_id=self.destination_id, 113 ) 114 return self._cloud_destination_object
Get the destination object.
116 @property 117 def stream_names(self) -> list[str]: 118 """The stream names.""" 119 if not self._connection_info: 120 self._connection_info = self._fetch_connection_info() 121 122 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
124 @property 125 def table_prefix(self) -> str: 126 """The table prefix.""" 127 if not self._connection_info: 128 self._connection_info = self._fetch_connection_info() 129 130 return self._connection_info.prefix or ""
The table prefix.
132 @property 133 def connection_url(self) -> str | None: 134 """The web URL to the connection.""" 135 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
137 @property 138 def job_history_url(self) -> str | None: 139 """The URL to the job history for the connection.""" 140 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
144 def run_sync( 145 self, 146 *, 147 wait: bool = True, 148 wait_timeout: int = 300, 149 ) -> SyncResult: 150 """Run a sync.""" 151 connection_response = api_util.run_connection( 152 connection_id=self.connection_id, 153 api_root=self.workspace.api_root, 154 workspace_id=self.workspace.workspace_id, 155 client_id=self.workspace.client_id, 156 client_secret=self.workspace.client_secret, 157 ) 158 sync_result = SyncResult( 159 workspace=self.workspace, 160 connection=self, 161 job_id=connection_response.job_id, 162 ) 163 164 if wait: 165 sync_result.wait_for_completion( 166 wait_timeout=wait_timeout, 167 raise_failure=True, 168 raise_timeout=True, 169 ) 170 171 return sync_result
Run a sync.
182 def get_previous_sync_logs( 183 self, 184 *, 185 limit: int = 10, 186 ) -> list[SyncResult]: 187 """Get the previous sync logs for a connection.""" 188 sync_logs: list[JobResponse] = api_util.get_job_logs( 189 connection_id=self.connection_id, 190 api_root=self.workspace.api_root, 191 workspace_id=self.workspace.workspace_id, 192 limit=limit, 193 client_id=self.workspace.client_id, 194 client_secret=self.workspace.client_secret, 195 ) 196 return [ 197 SyncResult( 198 workspace=self.workspace, 199 connection=self, 200 job_id=sync_log.job_id, 201 _latest_job_info=sync_log, 202 ) 203 for sync_log in sync_logs 204 ]
Get the previous sync logs for a connection.
206 def get_sync_result( 207 self, 208 job_id: int | None = None, 209 ) -> SyncResult | None: 210 """Get the sync result for the connection. 211 212 If `job_id` is not provided, the most recent sync job will be used. 213 214 Returns `None` if job_id is omitted and no previous jobs are found. 215 """ 216 if job_id is None: 217 # Get the most recent sync job 218 results = self.get_previous_sync_logs( 219 limit=1, 220 ) 221 if results: 222 return results[0] 223 224 return None 225 226 # Get the sync job by ID (lazy loaded) 227 return SyncResult( 228 workspace=self.workspace, 229 connection=self, 230 job_id=job_id, 231 )
Get the sync result for the connection.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
235 def permanently_delete( 236 self, 237 *, 238 cascade_delete_source: bool = False, 239 cascade_delete_destination: bool = False, 240 ) -> None: 241 """Delete the connection. 242 243 Args: 244 cascade_delete_source: Whether to also delete the source. 245 cascade_delete_destination: Whether to also delete the destination. 246 """ 247 self.workspace.permanently_delete_connection(self) 248 249 if cascade_delete_source: 250 self.workspace.permanently_delete_source(self.source_id) 251 252 if cascade_delete_destination: 253 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.