airbyte.cloud.connections
Cloud Connections.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Cloud Connections.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING, cast 7 8from airbyte._util import api_util 9from airbyte.cloud.connectors import CloudDestination, CloudSource 10from airbyte.cloud.sync_results import SyncResult 11 12 13if TYPE_CHECKING: 14 from airbyte_api.models import ConnectionResponse, JobResponse 15 16 from airbyte.cloud.workspaces import CloudWorkspace 17 18 19class CloudConnection: 20 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 21 22 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 23 """ 24 25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None 49 """The connection info object. (Cached.)""" 50 51 self._cloud_source_object: CloudSource | None = None 52 """The source object. (Cached.)""" 53 54 self._cloud_destination_object: CloudDestination | None = None 55 """The destination object. (Cached.)""" 56 57 def _fetch_connection_info(self) -> ConnectionResponse: 58 """Populate the connection with data from the API.""" 59 return api_util.get_connection( 60 workspace_id=self.workspace.workspace_id, 61 connection_id=self.connection_id, 62 api_root=self.workspace.api_root, 63 client_id=self.workspace.client_id, 64 client_secret=self.workspace.client_secret, 65 ) 66 67 # Properties 68 69 @property 70 def source_id(self) -> str: 71 """The ID of the source.""" 72 if not self._source_id: 73 if not self._connection_info: 74 self._connection_info = self._fetch_connection_info() 75 76 self._source_id = self._connection_info.source_id 77 78 return cast("str", self._source_id) 79 80 @property 81 def source(self) -> CloudSource: 82 """Get the source object.""" 83 if self._cloud_source_object: 84 return self._cloud_source_object 85 86 self._cloud_source_object = CloudSource( 87 workspace=self.workspace, 88 connector_id=self.source_id, 89 ) 90 return self._cloud_source_object 91 92 @property 93 def destination_id(self) -> str: 94 """The ID of the destination.""" 95 if not self._destination_id: 96 if not self._connection_info: 97 self._connection_info = self._fetch_connection_info() 98 99 self._destination_id = self._connection_info.source_id 100 101 return cast("str", self._destination_id) 102 103 @property 104 def destination(self) -> CloudDestination: 105 """Get the destination object.""" 106 if self._cloud_destination_object: 107 return self._cloud_destination_object 108 109 self._cloud_destination_object = CloudDestination( 110 workspace=self.workspace, 111 connector_id=self.destination_id, 112 ) 113 return self._cloud_destination_object 114 115 @property 116 def stream_names(self) -> list[str]: 117 """The stream names.""" 118 if not self._connection_info: 119 self._connection_info = self._fetch_connection_info() 120 121 return [stream.name for stream in self._connection_info.configurations.streams or []] 122 123 @property 124 def table_prefix(self) -> str: 125 """The table prefix.""" 126 if not self._connection_info: 127 self._connection_info = self._fetch_connection_info() 128 129 return self._connection_info.prefix or "" 130 131 @property 132 def connection_url(self) -> str | None: 133 """The URL to the connection.""" 134 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 135 136 @property 137 def job_history_url(self) -> str | None: 138 """The URL to the job history for the connection.""" 139 return f"{self.connection_url}/job-history" 140 141 # Run Sync 142 143 def run_sync( 144 self, 145 *, 146 wait: bool = True, 147 wait_timeout: int = 300, 148 ) -> SyncResult: 149 """Run a sync.""" 150 connection_response = api_util.run_connection( 151 connection_id=self.connection_id, 152 api_root=self.workspace.api_root, 153 workspace_id=self.workspace.workspace_id, 154 client_id=self.workspace.client_id, 155 client_secret=self.workspace.client_secret, 156 ) 157 sync_result = SyncResult( 158 workspace=self.workspace, 159 connection=self, 160 job_id=connection_response.job_id, 161 ) 162 163 if wait: 164 sync_result.wait_for_completion( 165 wait_timeout=wait_timeout, 166 raise_failure=True, 167 raise_timeout=True, 168 ) 169 170 return sync_result 171 172 # Logs 173 174 def get_previous_sync_logs( 175 self, 176 *, 177 limit: int = 10, 178 ) -> list[SyncResult]: 179 """Get the previous sync logs for a connection.""" 180 sync_logs: list[JobResponse] = api_util.get_job_logs( 181 connection_id=self.connection_id, 182 api_root=self.workspace.api_root, 183 workspace_id=self.workspace.workspace_id, 184 limit=limit, 185 client_id=self.workspace.client_id, 186 client_secret=self.workspace.client_secret, 187 ) 188 return [ 189 SyncResult( 190 workspace=self.workspace, 191 connection=self, 192 job_id=sync_log.job_id, 193 _latest_job_info=sync_log, 194 ) 195 for sync_log in sync_logs 196 ] 197 198 def get_sync_result( 199 self, 200 job_id: int | None = None, 201 ) -> SyncResult | None: 202 """Get the sync result for the connection. 203 204 If `job_id` is not provided, the most recent sync job will be used. 205 206 Returns `None` if job_id is omitted and no previous jobs are found. 207 """ 208 if job_id is None: 209 # Get the most recent sync job 210 results = self.get_previous_sync_logs( 211 limit=1, 212 ) 213 if results: 214 return results[0] 215 216 return None 217 218 # Get the sync job by ID (lazy loaded) 219 return SyncResult( 220 workspace=self.workspace, 221 connection=self, 222 job_id=job_id, 223 ) 224 225 # Deletions 226 227 def permanently_delete( 228 self, 229 *, 230 cascade_delete_source: bool = False, 231 cascade_delete_destination: bool = False, 232 ) -> None: 233 """Delete the connection. 234 235 Args: 236 cascade_delete_source: Whether to also delete the source. 237 cascade_delete_destination: Whether to also delete the destination. 238 """ 239 self.workspace.permanently_delete_connection(self) 240 241 if cascade_delete_source: 242 self.workspace.permanently_delete_source(self.source_id) 243 244 if cascade_delete_destination: 245 self.workspace.permanently_delete_destination(self.destination_id)
20class CloudConnection: 21 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 22 23 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 24 """ 25 26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)""" 57 58 def _fetch_connection_info(self) -> ConnectionResponse: 59 """Populate the connection with data from the API.""" 60 return api_util.get_connection( 61 workspace_id=self.workspace.workspace_id, 62 connection_id=self.connection_id, 63 api_root=self.workspace.api_root, 64 client_id=self.workspace.client_id, 65 client_secret=self.workspace.client_secret, 66 ) 67 68 # Properties 69 70 @property 71 def source_id(self) -> str: 72 """The ID of the source.""" 73 if not self._source_id: 74 if not self._connection_info: 75 self._connection_info = self._fetch_connection_info() 76 77 self._source_id = self._connection_info.source_id 78 79 return cast("str", self._source_id) 80 81 @property 82 def source(self) -> CloudSource: 83 """Get the source object.""" 84 if self._cloud_source_object: 85 return self._cloud_source_object 86 87 self._cloud_source_object = CloudSource( 88 workspace=self.workspace, 89 connector_id=self.source_id, 90 ) 91 return self._cloud_source_object 92 93 @property 94 def destination_id(self) -> str: 95 """The ID of the destination.""" 96 if not self._destination_id: 97 if not self._connection_info: 98 self._connection_info = self._fetch_connection_info() 99 100 self._destination_id = self._connection_info.source_id 101 102 return cast("str", self._destination_id) 103 104 @property 105 def destination(self) -> CloudDestination: 106 """Get the destination object.""" 107 if self._cloud_destination_object: 108 return self._cloud_destination_object 109 110 self._cloud_destination_object = CloudDestination( 111 workspace=self.workspace, 112 connector_id=self.destination_id, 113 ) 114 return self._cloud_destination_object 115 116 @property 117 def stream_names(self) -> list[str]: 118 """The stream names.""" 119 if not self._connection_info: 120 self._connection_info = self._fetch_connection_info() 121 122 return [stream.name for stream in self._connection_info.configurations.streams or []] 123 124 @property 125 def table_prefix(self) -> str: 126 """The table prefix.""" 127 if not self._connection_info: 128 self._connection_info = self._fetch_connection_info() 129 130 return self._connection_info.prefix or "" 131 132 @property 133 def connection_url(self) -> str | None: 134 """The URL to the connection.""" 135 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 136 137 @property 138 def job_history_url(self) -> str | None: 139 """The URL to the job history for the connection.""" 140 return f"{self.connection_url}/job-history" 141 142 # Run Sync 143 144 def run_sync( 145 self, 146 *, 147 wait: bool = True, 148 wait_timeout: int = 300, 149 ) -> SyncResult: 150 """Run a sync.""" 151 connection_response = api_util.run_connection( 152 connection_id=self.connection_id, 153 api_root=self.workspace.api_root, 154 workspace_id=self.workspace.workspace_id, 155 client_id=self.workspace.client_id, 156 client_secret=self.workspace.client_secret, 157 ) 158 sync_result = SyncResult( 159 workspace=self.workspace, 160 connection=self, 161 job_id=connection_response.job_id, 162 ) 163 164 if wait: 165 sync_result.wait_for_completion( 166 wait_timeout=wait_timeout, 167 raise_failure=True, 168 raise_timeout=True, 169 ) 170 171 return sync_result 172 173 # Logs 174 175 def get_previous_sync_logs( 176 self, 177 *, 178 limit: int = 10, 179 ) -> list[SyncResult]: 180 """Get the previous sync logs for a connection.""" 181 sync_logs: list[JobResponse] = api_util.get_job_logs( 182 connection_id=self.connection_id, 183 api_root=self.workspace.api_root, 184 workspace_id=self.workspace.workspace_id, 185 limit=limit, 186 client_id=self.workspace.client_id, 187 client_secret=self.workspace.client_secret, 188 ) 189 return [ 190 SyncResult( 191 workspace=self.workspace, 192 connection=self, 193 job_id=sync_log.job_id, 194 _latest_job_info=sync_log, 195 ) 196 for sync_log in sync_logs 197 ] 198 199 def get_sync_result( 200 self, 201 job_id: int | None = None, 202 ) -> SyncResult | None: 203 """Get the sync result for the connection. 204 205 If `job_id` is not provided, the most recent sync job will be used. 206 207 Returns `None` if job_id is omitted and no previous jobs are found. 208 """ 209 if job_id is None: 210 # Get the most recent sync job 211 results = self.get_previous_sync_logs( 212 limit=1, 213 ) 214 if results: 215 return results[0] 216 217 return None 218 219 # Get the sync job by ID (lazy loaded) 220 return SyncResult( 221 workspace=self.workspace, 222 connection=self, 223 job_id=job_id, 224 ) 225 226 # Deletions 227 228 def permanently_delete( 229 self, 230 *, 231 cascade_delete_source: bool = False, 232 cascade_delete_destination: bool = False, 233 ) -> None: 234 """Delete the connection. 235 236 Args: 237 cascade_delete_source: Whether to also delete the source. 238 cascade_delete_destination: Whether to also delete the destination. 239 """ 240 self.workspace.permanently_delete_connection(self) 241 242 if cascade_delete_source: 243 self.workspace.permanently_delete_source(self.source_id) 244 245 if cascade_delete_destination: 246 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
26 def __init__( 27 self, 28 workspace: CloudWorkspace, 29 connection_id: str, 30 source: str | None = None, 31 destination: str | None = None, 32 ) -> None: 33 """It is not recommended to create a `CloudConnection` object directly. 34 35 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 36 """ 37 self.connection_id = connection_id 38 """The ID of the connection.""" 39 40 self.workspace = workspace 41 """The workspace that the connection belongs to.""" 42 43 self._source_id = source 44 """The ID of the source.""" 45 46 self._destination_id = destination 47 """The ID of the destination.""" 48 49 self._connection_info: ConnectionResponse | None = None 50 """The connection info object. (Cached.)""" 51 52 self._cloud_source_object: CloudSource | None = None 53 """The source object. (Cached.)""" 54 55 self._cloud_destination_object: CloudDestination | None = None 56 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection
object directly.
Instead, use CloudWorkspace.get_connection()
to create a connection object.
70 @property 71 def source_id(self) -> str: 72 """The ID of the source.""" 73 if not self._source_id: 74 if not self._connection_info: 75 self._connection_info = self._fetch_connection_info() 76 77 self._source_id = self._connection_info.source_id 78 79 return cast("str", self._source_id)
The ID of the source.
81 @property 82 def source(self) -> CloudSource: 83 """Get the source object.""" 84 if self._cloud_source_object: 85 return self._cloud_source_object 86 87 self._cloud_source_object = CloudSource( 88 workspace=self.workspace, 89 connector_id=self.source_id, 90 ) 91 return self._cloud_source_object
Get the source object.
93 @property 94 def destination_id(self) -> str: 95 """The ID of the destination.""" 96 if not self._destination_id: 97 if not self._connection_info: 98 self._connection_info = self._fetch_connection_info() 99 100 self._destination_id = self._connection_info.source_id 101 102 return cast("str", self._destination_id)
The ID of the destination.
104 @property 105 def destination(self) -> CloudDestination: 106 """Get the destination object.""" 107 if self._cloud_destination_object: 108 return self._cloud_destination_object 109 110 self._cloud_destination_object = CloudDestination( 111 workspace=self.workspace, 112 connector_id=self.destination_id, 113 ) 114 return self._cloud_destination_object
Get the destination object.
116 @property 117 def stream_names(self) -> list[str]: 118 """The stream names.""" 119 if not self._connection_info: 120 self._connection_info = self._fetch_connection_info() 121 122 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
124 @property 125 def table_prefix(self) -> str: 126 """The table prefix.""" 127 if not self._connection_info: 128 self._connection_info = self._fetch_connection_info() 129 130 return self._connection_info.prefix or ""
The table prefix.
132 @property 133 def connection_url(self) -> str | None: 134 """The URL to the connection.""" 135 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The URL to the connection.
137 @property 138 def job_history_url(self) -> str | None: 139 """The URL to the job history for the connection.""" 140 return f"{self.connection_url}/job-history"
The URL to the job history for the connection.
144 def run_sync( 145 self, 146 *, 147 wait: bool = True, 148 wait_timeout: int = 300, 149 ) -> SyncResult: 150 """Run a sync.""" 151 connection_response = api_util.run_connection( 152 connection_id=self.connection_id, 153 api_root=self.workspace.api_root, 154 workspace_id=self.workspace.workspace_id, 155 client_id=self.workspace.client_id, 156 client_secret=self.workspace.client_secret, 157 ) 158 sync_result = SyncResult( 159 workspace=self.workspace, 160 connection=self, 161 job_id=connection_response.job_id, 162 ) 163 164 if wait: 165 sync_result.wait_for_completion( 166 wait_timeout=wait_timeout, 167 raise_failure=True, 168 raise_timeout=True, 169 ) 170 171 return sync_result
Run a sync.
175 def get_previous_sync_logs( 176 self, 177 *, 178 limit: int = 10, 179 ) -> list[SyncResult]: 180 """Get the previous sync logs for a connection.""" 181 sync_logs: list[JobResponse] = api_util.get_job_logs( 182 connection_id=self.connection_id, 183 api_root=self.workspace.api_root, 184 workspace_id=self.workspace.workspace_id, 185 limit=limit, 186 client_id=self.workspace.client_id, 187 client_secret=self.workspace.client_secret, 188 ) 189 return [ 190 SyncResult( 191 workspace=self.workspace, 192 connection=self, 193 job_id=sync_log.job_id, 194 _latest_job_info=sync_log, 195 ) 196 for sync_log in sync_logs 197 ]
Get the previous sync logs for a connection.
199 def get_sync_result( 200 self, 201 job_id: int | None = None, 202 ) -> SyncResult | None: 203 """Get the sync result for the connection. 204 205 If `job_id` is not provided, the most recent sync job will be used. 206 207 Returns `None` if job_id is omitted and no previous jobs are found. 208 """ 209 if job_id is None: 210 # Get the most recent sync job 211 results = self.get_previous_sync_logs( 212 limit=1, 213 ) 214 if results: 215 return results[0] 216 217 return None 218 219 # Get the sync job by ID (lazy loaded) 220 return SyncResult( 221 workspace=self.workspace, 222 connection=self, 223 job_id=job_id, 224 )
Get the sync result for the connection.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.
228 def permanently_delete( 229 self, 230 *, 231 cascade_delete_source: bool = False, 232 cascade_delete_destination: bool = False, 233 ) -> None: 234 """Delete the connection. 235 236 Args: 237 cascade_delete_source: Whether to also delete the source. 238 cascade_delete_destination: Whether to also delete the destination. 239 """ 240 self.workspace.permanently_delete_connection(self) 241 242 if cascade_delete_source: 243 self.workspace.permanently_delete_source(self.source_id) 244 245 if cascade_delete_destination: 246 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.