airbyte.cloud.connections
Cloud Connections.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Cloud Connections.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING, cast 7 8from airbyte._util import api_util 9from airbyte.cloud.sync_results import SyncResult 10 11 12if TYPE_CHECKING: 13 from airbyte_api.models import ConnectionResponse, JobResponse 14 15 from airbyte.cloud.workspaces import CloudWorkspace 16 17 18class CloudConnection: 19 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 20 21 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 22 """ 23 24 def __init__( 25 self, 26 workspace: CloudWorkspace, 27 connection_id: str, 28 source: str | None = None, 29 destination: str | None = None, 30 ) -> None: 31 """It is not recommended to create a `CloudConnection` object directly. 32 33 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 34 """ 35 self.connection_id = connection_id 36 """The ID of the connection.""" 37 38 self.workspace = workspace 39 """The workspace that the connection belongs to.""" 40 41 self._source_id = source 42 """The ID of the source.""" 43 44 self._destination_id = destination 45 """The ID of the destination.""" 46 47 self._connection_info: ConnectionResponse | None = None 48 49 def _fetch_connection_info(self) -> ConnectionResponse: 50 """Populate the connection with data from the API.""" 51 return api_util.get_connection( 52 workspace_id=self.workspace.workspace_id, 53 connection_id=self.connection_id, 54 api_root=self.workspace.api_root, 55 api_key=self.workspace.api_key, 56 ) 57 58 # Properties 59 60 @property 61 def source_id(self) -> str: 62 """The ID of the source.""" 63 if not self._source_id: 64 if not self._connection_info: 65 self._connection_info = self._fetch_connection_info() 66 67 self._source_id = self._connection_info.source_id 68 69 return cast(str, self._source_id) 70 71 @property 72 def destination_id(self) -> str: 73 """The ID of the destination.""" 74 if not self._destination_id: 75 if not self._connection_info: 76 self._connection_info = self._fetch_connection_info() 77 78 self._destination_id = self._connection_info.source_id 79 80 return cast(str, self._destination_id) 81 82 @property 83 def stream_names(self) -> list[str]: 84 """The stream names.""" 85 if not self._connection_info: 86 self._connection_info = self._fetch_connection_info() 87 88 return [stream.name for stream in self._connection_info.configurations.streams] 89 90 @property 91 def table_prefix(self) -> str: 92 """The table prefix.""" 93 if not self._connection_info: 94 self._connection_info = self._fetch_connection_info() 95 96 return self._connection_info.prefix 97 98 @property 99 def connection_url(self) -> str | None: 100 """The URL to the connection.""" 101 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 102 103 @property 104 def job_history_url(self) -> str | None: 105 """The URL to the job history for the connection.""" 106 return f"{self.connection_url}/job-history" 107 108 # Run Sync 109 110 def run_sync( 111 self, 112 *, 113 wait: bool = True, 114 wait_timeout: int = 300, 115 ) -> SyncResult: 116 """Run a sync.""" 117 connection_response = api_util.run_connection( 118 connection_id=self.connection_id, 119 api_root=self.workspace.api_root, 120 api_key=self.workspace.api_key, 121 workspace_id=self.workspace.workspace_id, 122 ) 123 sync_result = SyncResult( 124 workspace=self.workspace, 125 connection=self, 126 job_id=connection_response.job_id, 127 ) 128 129 if wait: 130 sync_result.wait_for_completion( 131 wait_timeout=wait_timeout, 132 raise_failure=True, 133 raise_timeout=True, 134 ) 135 136 return sync_result 137 138 # Logs 139 140 def get_previous_sync_logs( 141 self, 142 *, 143 limit: int = 10, 144 ) -> list[SyncResult]: 145 """Get the previous sync logs for a connection.""" 146 sync_logs: list[JobResponse] = api_util.get_job_logs( 147 connection_id=self.connection_id, 148 api_root=self.workspace.api_root, 149 api_key=self.workspace.api_key, 150 workspace_id=self.workspace.workspace_id, 151 limit=limit, 152 ) 153 return [ 154 SyncResult( 155 workspace=self.workspace, 156 connection=self, 157 job_id=sync_log.job_id, 158 _latest_job_info=sync_log, 159 ) 160 for sync_log in sync_logs 161 ] 162 163 def get_sync_result( 164 self, 165 job_id: str | None = None, 166 ) -> SyncResult | None: 167 """Get the sync result for the connection. 168 169 If `job_id` is not provided, the most recent sync job will be used. 170 171 Returns `None` if job_id is omitted and no previous jobs are found. 172 """ 173 if job_id is None: 174 # Get the most recent sync job 175 results = self.get_previous_sync_logs( 176 limit=1, 177 ) 178 if results: 179 return results[0] 180 181 return None 182 183 # Get the sync job by ID (lazy loaded) 184 return SyncResult( 185 workspace=self.workspace, 186 connection=self, 187 job_id=job_id, 188 ) 189 190 # Deletions 191 192 def _permanently_delete( 193 self, 194 *, 195 delete_source: bool = False, 196 delete_destination: bool = False, 197 ) -> None: 198 """Delete the connection. 199 200 Args: 201 delete_source: Whether to also delete the source. 202 delete_destination: Whether to also delete the destination. 203 """ 204 self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now) 205 connection=self 206 ) 207 208 if delete_source: 209 self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now) 210 source=self.source_id 211 ) 212 213 if delete_destination: 214 self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API 215 destination=self.destination_id, 216 )
class
CloudConnection:
19class CloudConnection: 20 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 21 22 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 23 """ 24 25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None 49 50 def _fetch_connection_info(self) -> ConnectionResponse: 51 """Populate the connection with data from the API.""" 52 return api_util.get_connection( 53 workspace_id=self.workspace.workspace_id, 54 connection_id=self.connection_id, 55 api_root=self.workspace.api_root, 56 api_key=self.workspace.api_key, 57 ) 58 59 # Properties 60 61 @property 62 def source_id(self) -> str: 63 """The ID of the source.""" 64 if not self._source_id: 65 if not self._connection_info: 66 self._connection_info = self._fetch_connection_info() 67 68 self._source_id = self._connection_info.source_id 69 70 return cast(str, self._source_id) 71 72 @property 73 def destination_id(self) -> str: 74 """The ID of the destination.""" 75 if not self._destination_id: 76 if not self._connection_info: 77 self._connection_info = self._fetch_connection_info() 78 79 self._destination_id = self._connection_info.source_id 80 81 return cast(str, self._destination_id) 82 83 @property 84 def stream_names(self) -> list[str]: 85 """The stream names.""" 86 if not self._connection_info: 87 self._connection_info = self._fetch_connection_info() 88 89 return [stream.name for stream in self._connection_info.configurations.streams] 90 91 @property 92 def table_prefix(self) -> str: 93 """The table prefix.""" 94 if not self._connection_info: 95 self._connection_info = self._fetch_connection_info() 96 97 return self._connection_info.prefix 98 99 @property 100 def connection_url(self) -> str | None: 101 """The URL to the connection.""" 102 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 103 104 @property 105 def job_history_url(self) -> str | None: 106 """The URL to the job history for the connection.""" 107 return f"{self.connection_url}/job-history" 108 109 # Run Sync 110 111 def run_sync( 112 self, 113 *, 114 wait: bool = True, 115 wait_timeout: int = 300, 116 ) -> SyncResult: 117 """Run a sync.""" 118 connection_response = api_util.run_connection( 119 connection_id=self.connection_id, 120 api_root=self.workspace.api_root, 121 api_key=self.workspace.api_key, 122 workspace_id=self.workspace.workspace_id, 123 ) 124 sync_result = SyncResult( 125 workspace=self.workspace, 126 connection=self, 127 job_id=connection_response.job_id, 128 ) 129 130 if wait: 131 sync_result.wait_for_completion( 132 wait_timeout=wait_timeout, 133 raise_failure=True, 134 raise_timeout=True, 135 ) 136 137 return sync_result 138 139 # Logs 140 141 def get_previous_sync_logs( 142 self, 143 *, 144 limit: int = 10, 145 ) -> list[SyncResult]: 146 """Get the previous sync logs for a connection.""" 147 sync_logs: list[JobResponse] = api_util.get_job_logs( 148 connection_id=self.connection_id, 149 api_root=self.workspace.api_root, 150 api_key=self.workspace.api_key, 151 workspace_id=self.workspace.workspace_id, 152 limit=limit, 153 ) 154 return [ 155 SyncResult( 156 workspace=self.workspace, 157 connection=self, 158 job_id=sync_log.job_id, 159 _latest_job_info=sync_log, 160 ) 161 for sync_log in sync_logs 162 ] 163 164 def get_sync_result( 165 self, 166 job_id: str | None = None, 167 ) -> SyncResult | None: 168 """Get the sync result for the connection. 169 170 If `job_id` is not provided, the most recent sync job will be used. 171 172 Returns `None` if job_id is omitted and no previous jobs are found. 173 """ 174 if job_id is None: 175 # Get the most recent sync job 176 results = self.get_previous_sync_logs( 177 limit=1, 178 ) 179 if results: 180 return results[0] 181 182 return None 183 184 # Get the sync job by ID (lazy loaded) 185 return SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=job_id, 189 ) 190 191 # Deletions 192 193 def _permanently_delete( 194 self, 195 *, 196 delete_source: bool = False, 197 delete_destination: bool = False, 198 ) -> None: 199 """Delete the connection. 200 201 Args: 202 delete_source: Whether to also delete the source. 203 delete_destination: Whether to also delete the destination. 204 """ 205 self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now) 206 connection=self 207 ) 208 209 if delete_source: 210 self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now) 211 source=self.source_id 212 ) 213 214 if delete_destination: 215 self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API 216 destination=self.destination_id, 217 )
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
CloudConnection( workspace: airbyte.cloud.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None
It is not recommended to create a CloudConnection
object directly.
Instead, use CloudWorkspace.get_connection()
to create a connection object.
source_id: str
61 @property 62 def source_id(self) -> str: 63 """The ID of the source.""" 64 if not self._source_id: 65 if not self._connection_info: 66 self._connection_info = self._fetch_connection_info() 67 68 self._source_id = self._connection_info.source_id 69 70 return cast(str, self._source_id)
The ID of the source.
destination_id: str
72 @property 73 def destination_id(self) -> str: 74 """The ID of the destination.""" 75 if not self._destination_id: 76 if not self._connection_info: 77 self._connection_info = self._fetch_connection_info() 78 79 self._destination_id = self._connection_info.source_id 80 81 return cast(str, self._destination_id)
The ID of the destination.
stream_names: list[str]
83 @property 84 def stream_names(self) -> list[str]: 85 """The stream names.""" 86 if not self._connection_info: 87 self._connection_info = self._fetch_connection_info() 88 89 return [stream.name for stream in self._connection_info.configurations.streams]
The stream names.
table_prefix: str
91 @property 92 def table_prefix(self) -> str: 93 """The table prefix.""" 94 if not self._connection_info: 95 self._connection_info = self._fetch_connection_info() 96 97 return self._connection_info.prefix
The table prefix.
connection_url: str | None
99 @property 100 def connection_url(self) -> str | None: 101 """The URL to the connection.""" 102 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The URL to the connection.
job_history_url: str | None
104 @property 105 def job_history_url(self) -> str | None: 106 """The URL to the job history for the connection.""" 107 return f"{self.connection_url}/job-history"
The URL to the job history for the connection.
111 def run_sync( 112 self, 113 *, 114 wait: bool = True, 115 wait_timeout: int = 300, 116 ) -> SyncResult: 117 """Run a sync.""" 118 connection_response = api_util.run_connection( 119 connection_id=self.connection_id, 120 api_root=self.workspace.api_root, 121 api_key=self.workspace.api_key, 122 workspace_id=self.workspace.workspace_id, 123 ) 124 sync_result = SyncResult( 125 workspace=self.workspace, 126 connection=self, 127 job_id=connection_response.job_id, 128 ) 129 130 if wait: 131 sync_result.wait_for_completion( 132 wait_timeout=wait_timeout, 133 raise_failure=True, 134 raise_timeout=True, 135 ) 136 137 return sync_result
Run a sync.
141 def get_previous_sync_logs( 142 self, 143 *, 144 limit: int = 10, 145 ) -> list[SyncResult]: 146 """Get the previous sync logs for a connection.""" 147 sync_logs: list[JobResponse] = api_util.get_job_logs( 148 connection_id=self.connection_id, 149 api_root=self.workspace.api_root, 150 api_key=self.workspace.api_key, 151 workspace_id=self.workspace.workspace_id, 152 limit=limit, 153 ) 154 return [ 155 SyncResult( 156 workspace=self.workspace, 157 connection=self, 158 job_id=sync_log.job_id, 159 _latest_job_info=sync_log, 160 ) 161 for sync_log in sync_logs 162 ]
Get the previous sync logs for a connection.
164 def get_sync_result( 165 self, 166 job_id: str | None = None, 167 ) -> SyncResult | None: 168 """Get the sync result for the connection. 169 170 If `job_id` is not provided, the most recent sync job will be used. 171 172 Returns `None` if job_id is omitted and no previous jobs are found. 173 """ 174 if job_id is None: 175 # Get the most recent sync job 176 results = self.get_previous_sync_logs( 177 limit=1, 178 ) 179 if results: 180 return results[0] 181 182 return None 183 184 # Get the sync job by ID (lazy loaded) 185 return SyncResult( 186 workspace=self.workspace, 187 connection=self, 188 job_id=job_id, 189 )
Get the sync result for the connection.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.