airbyte.cloud.connections
Cloud Connections.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""Cloud Connections.""" 3 4from __future__ import annotations 5 6from typing import TYPE_CHECKING, cast 7 8from airbyte._util import api_util 9from airbyte.cloud.sync_results import SyncResult 10 11 12if TYPE_CHECKING: 13 from airbyte_api.models import ConnectionResponse, JobResponse 14 15 from airbyte.cloud.workspaces import CloudWorkspace 16 17 18class CloudConnection: 19 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 20 21 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 22 """ 23 24 def __init__( 25 self, 26 workspace: CloudWorkspace, 27 connection_id: str, 28 source: str | None = None, 29 destination: str | None = None, 30 ) -> None: 31 """It is not recommended to create a `CloudConnection` object directly. 32 33 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 34 """ 35 self.connection_id = connection_id 36 """The ID of the connection.""" 37 38 self.workspace = workspace 39 """The workspace that the connection belongs to.""" 40 41 self._source_id = source 42 """The ID of the source.""" 43 44 self._destination_id = destination 45 """The ID of the destination.""" 46 47 self._connection_info: ConnectionResponse | None = None 48 49 def _fetch_connection_info(self) -> ConnectionResponse: 50 """Populate the connection with data from the API.""" 51 return api_util.get_connection( 52 workspace_id=self.workspace.workspace_id, 53 connection_id=self.connection_id, 54 api_root=self.workspace.api_root, 55 api_key=self.workspace.api_key, 56 ) 57 58 # Properties 59 60 @property 61 def source_id(self) -> str: 62 """The ID of the source.""" 63 if not self._source_id: 64 if not self._connection_info: 65 self._connection_info = self._fetch_connection_info() 66 67 self._source_id = self._connection_info.source_id 68 69 return cast(str, self._source_id) 70 71 @property 72 def destination_id(self) -> str: 73 """The ID of the destination.""" 74 if not self._destination_id: 75 if not self._connection_info: 76 self._connection_info = self._fetch_connection_info() 77 78 self._destination_id = self._connection_info.source_id 79 80 return cast(str, self._destination_id) 81 82 @property 83 def stream_names(self) -> list[str]: 84 """The stream names.""" 85 if not self._connection_info: 86 self._connection_info = self._fetch_connection_info() 87 88 return [stream.name for stream in self._connection_info.configurations.streams] 89 90 @property 91 def table_prefix(self) -> str: 92 """The table prefix.""" 93 if not self._connection_info: 94 self._connection_info = self._fetch_connection_info() 95 96 return self._connection_info.prefix 97 98 @property 99 def connection_url(self) -> str | None: 100 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 101 102 @property 103 def job_history_url(self) -> str | None: 104 return f"{self.connection_url}/job-history" 105 106 # Run Sync 107 108 def run_sync( 109 self, 110 *, 111 wait: bool = True, 112 wait_timeout: int = 300, 113 ) -> SyncResult: 114 """Run a sync.""" 115 connection_response = api_util.run_connection( 116 connection_id=self.connection_id, 117 api_root=self.workspace.api_root, 118 api_key=self.workspace.api_key, 119 workspace_id=self.workspace.workspace_id, 120 ) 121 sync_result = SyncResult( 122 workspace=self.workspace, 123 connection=self, 124 job_id=connection_response.job_id, 125 ) 126 127 if wait: 128 sync_result.wait_for_completion( 129 wait_timeout=wait_timeout, 130 raise_failure=True, 131 raise_timeout=True, 132 ) 133 134 return sync_result 135 136 # Logs 137 138 def get_previous_sync_logs( 139 self, 140 *, 141 limit: int = 10, 142 ) -> list[SyncResult]: 143 """Get the previous sync logs for a connection.""" 144 sync_logs: list[JobResponse] = api_util.get_job_logs( 145 connection_id=self.connection_id, 146 api_root=self.workspace.api_root, 147 api_key=self.workspace.api_key, 148 workspace_id=self.workspace.workspace_id, 149 limit=limit, 150 ) 151 return [ 152 SyncResult( 153 workspace=self.workspace, 154 connection=self, 155 job_id=sync_log.job_id, 156 _latest_job_info=sync_log, 157 ) 158 for sync_log in sync_logs 159 ] 160 161 def get_sync_result( 162 self, 163 job_id: str | None = None, 164 ) -> SyncResult | None: 165 """Get the sync result for the connection. 166 167 If `job_id` is not provided, the most recent sync job will be used. 168 169 Returns `None` if job_id is omitted and no previous jobs are found. 170 """ 171 if job_id is None: 172 # Get the most recent sync job 173 results = self.get_previous_sync_logs( 174 limit=1, 175 ) 176 if results: 177 return results[0] 178 179 return None 180 181 # Get the sync job by ID (lazy loaded) 182 return SyncResult( 183 workspace=self.workspace, 184 connection=self, 185 job_id=job_id, 186 ) 187 188 # Deletions 189 190 def _permanently_delete( 191 self, 192 *, 193 delete_source: bool = False, 194 delete_destination: bool = False, 195 ) -> None: 196 """Delete the connection. 197 198 Args: 199 delete_source: Whether to also delete the source. 200 delete_destination: Whether to also delete the destination. 201 """ 202 self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now) 203 connection=self 204 ) 205 206 if delete_source: 207 self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now) 208 source=self.source_id 209 ) 210 211 if delete_destination: 212 self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API 213 destination=self.destination_id, 214 )
class
CloudConnection:
19class CloudConnection: 20 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 21 22 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 23 """ 24 25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None 49 50 def _fetch_connection_info(self) -> ConnectionResponse: 51 """Populate the connection with data from the API.""" 52 return api_util.get_connection( 53 workspace_id=self.workspace.workspace_id, 54 connection_id=self.connection_id, 55 api_root=self.workspace.api_root, 56 api_key=self.workspace.api_key, 57 ) 58 59 # Properties 60 61 @property 62 def source_id(self) -> str: 63 """The ID of the source.""" 64 if not self._source_id: 65 if not self._connection_info: 66 self._connection_info = self._fetch_connection_info() 67 68 self._source_id = self._connection_info.source_id 69 70 return cast(str, self._source_id) 71 72 @property 73 def destination_id(self) -> str: 74 """The ID of the destination.""" 75 if not self._destination_id: 76 if not self._connection_info: 77 self._connection_info = self._fetch_connection_info() 78 79 self._destination_id = self._connection_info.source_id 80 81 return cast(str, self._destination_id) 82 83 @property 84 def stream_names(self) -> list[str]: 85 """The stream names.""" 86 if not self._connection_info: 87 self._connection_info = self._fetch_connection_info() 88 89 return [stream.name for stream in self._connection_info.configurations.streams] 90 91 @property 92 def table_prefix(self) -> str: 93 """The table prefix.""" 94 if not self._connection_info: 95 self._connection_info = self._fetch_connection_info() 96 97 return self._connection_info.prefix 98 99 @property 100 def connection_url(self) -> str | None: 101 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 102 103 @property 104 def job_history_url(self) -> str | None: 105 return f"{self.connection_url}/job-history" 106 107 # Run Sync 108 109 def run_sync( 110 self, 111 *, 112 wait: bool = True, 113 wait_timeout: int = 300, 114 ) -> SyncResult: 115 """Run a sync.""" 116 connection_response = api_util.run_connection( 117 connection_id=self.connection_id, 118 api_root=self.workspace.api_root, 119 api_key=self.workspace.api_key, 120 workspace_id=self.workspace.workspace_id, 121 ) 122 sync_result = SyncResult( 123 workspace=self.workspace, 124 connection=self, 125 job_id=connection_response.job_id, 126 ) 127 128 if wait: 129 sync_result.wait_for_completion( 130 wait_timeout=wait_timeout, 131 raise_failure=True, 132 raise_timeout=True, 133 ) 134 135 return sync_result 136 137 # Logs 138 139 def get_previous_sync_logs( 140 self, 141 *, 142 limit: int = 10, 143 ) -> list[SyncResult]: 144 """Get the previous sync logs for a connection.""" 145 sync_logs: list[JobResponse] = api_util.get_job_logs( 146 connection_id=self.connection_id, 147 api_root=self.workspace.api_root, 148 api_key=self.workspace.api_key, 149 workspace_id=self.workspace.workspace_id, 150 limit=limit, 151 ) 152 return [ 153 SyncResult( 154 workspace=self.workspace, 155 connection=self, 156 job_id=sync_log.job_id, 157 _latest_job_info=sync_log, 158 ) 159 for sync_log in sync_logs 160 ] 161 162 def get_sync_result( 163 self, 164 job_id: str | None = None, 165 ) -> SyncResult | None: 166 """Get the sync result for the connection. 167 168 If `job_id` is not provided, the most recent sync job will be used. 169 170 Returns `None` if job_id is omitted and no previous jobs are found. 171 """ 172 if job_id is None: 173 # Get the most recent sync job 174 results = self.get_previous_sync_logs( 175 limit=1, 176 ) 177 if results: 178 return results[0] 179 180 return None 181 182 # Get the sync job by ID (lazy loaded) 183 return SyncResult( 184 workspace=self.workspace, 185 connection=self, 186 job_id=job_id, 187 ) 188 189 # Deletions 190 191 def _permanently_delete( 192 self, 193 *, 194 delete_source: bool = False, 195 delete_destination: bool = False, 196 ) -> None: 197 """Delete the connection. 198 199 Args: 200 delete_source: Whether to also delete the source. 201 delete_destination: Whether to also delete the destination. 202 """ 203 self.workspace._permanently_delete_connection( # noqa: SLF001 # Non-public API (for now) 204 connection=self 205 ) 206 207 if delete_source: 208 self.workspace._permanently_delete_source( # noqa: SLF001 # Non-public API (for now) 209 source=self.source_id 210 ) 211 212 if delete_destination: 213 self.workspace._permanently_delete_destination( # noqa: SLF001 # Non-public API 214 destination=self.destination_id, 215 )
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
CloudConnection( workspace: airbyte.cloud.workspaces.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
25 def __init__( 26 self, 27 workspace: CloudWorkspace, 28 connection_id: str, 29 source: str | None = None, 30 destination: str | None = None, 31 ) -> None: 32 """It is not recommended to create a `CloudConnection` object directly. 33 34 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 35 """ 36 self.connection_id = connection_id 37 """The ID of the connection.""" 38 39 self.workspace = workspace 40 """The workspace that the connection belongs to.""" 41 42 self._source_id = source 43 """The ID of the source.""" 44 45 self._destination_id = destination 46 """The ID of the destination.""" 47 48 self._connection_info: ConnectionResponse | None = None
It is not recommended to create a CloudConnection
object directly.
Instead, use CloudWorkspace.get_connection()
to create a connection object.
source_id: str
61 @property 62 def source_id(self) -> str: 63 """The ID of the source.""" 64 if not self._source_id: 65 if not self._connection_info: 66 self._connection_info = self._fetch_connection_info() 67 68 self._source_id = self._connection_info.source_id 69 70 return cast(str, self._source_id)
The ID of the source.
destination_id: str
72 @property 73 def destination_id(self) -> str: 74 """The ID of the destination.""" 75 if not self._destination_id: 76 if not self._connection_info: 77 self._connection_info = self._fetch_connection_info() 78 79 self._destination_id = self._connection_info.source_id 80 81 return cast(str, self._destination_id)
The ID of the destination.
stream_names: list[str]
83 @property 84 def stream_names(self) -> list[str]: 85 """The stream names.""" 86 if not self._connection_info: 87 self._connection_info = self._fetch_connection_info() 88 89 return [stream.name for stream in self._connection_info.configurations.streams]
The stream names.
table_prefix: str
91 @property 92 def table_prefix(self) -> str: 93 """The table prefix.""" 94 if not self._connection_info: 95 self._connection_info = self._fetch_connection_info() 96 97 return self._connection_info.prefix
The table prefix.
def
run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.sync_results.SyncResult:
109 def run_sync( 110 self, 111 *, 112 wait: bool = True, 113 wait_timeout: int = 300, 114 ) -> SyncResult: 115 """Run a sync.""" 116 connection_response = api_util.run_connection( 117 connection_id=self.connection_id, 118 api_root=self.workspace.api_root, 119 api_key=self.workspace.api_key, 120 workspace_id=self.workspace.workspace_id, 121 ) 122 sync_result = SyncResult( 123 workspace=self.workspace, 124 connection=self, 125 job_id=connection_response.job_id, 126 ) 127 128 if wait: 129 sync_result.wait_for_completion( 130 wait_timeout=wait_timeout, 131 raise_failure=True, 132 raise_timeout=True, 133 ) 134 135 return sync_result
Run a sync.
def
get_previous_sync_logs(self, *, limit: int = 10) -> list[airbyte.cloud.sync_results.SyncResult]:
139 def get_previous_sync_logs( 140 self, 141 *, 142 limit: int = 10, 143 ) -> list[SyncResult]: 144 """Get the previous sync logs for a connection.""" 145 sync_logs: list[JobResponse] = api_util.get_job_logs( 146 connection_id=self.connection_id, 147 api_root=self.workspace.api_root, 148 api_key=self.workspace.api_key, 149 workspace_id=self.workspace.workspace_id, 150 limit=limit, 151 ) 152 return [ 153 SyncResult( 154 workspace=self.workspace, 155 connection=self, 156 job_id=sync_log.job_id, 157 _latest_job_info=sync_log, 158 ) 159 for sync_log in sync_logs 160 ]
Get the previous sync logs for a connection.
def
get_sync_result( self, job_id: str | None = None) -> airbyte.cloud.sync_results.SyncResult | None:
162 def get_sync_result( 163 self, 164 job_id: str | None = None, 165 ) -> SyncResult | None: 166 """Get the sync result for the connection. 167 168 If `job_id` is not provided, the most recent sync job will be used. 169 170 Returns `None` if job_id is omitted and no previous jobs are found. 171 """ 172 if job_id is None: 173 # Get the most recent sync job 174 results = self.get_previous_sync_logs( 175 limit=1, 176 ) 177 if results: 178 return results[0] 179 180 return None 181 182 # Get the sync job by ID (lazy loaded) 183 return SyncResult( 184 workspace=self.workspace, 185 connection=self, 186 job_id=job_id, 187 )
Get the sync result for the connection.
If job_id
is not provided, the most recent sync job will be used.
Returns None
if job_id is omitted and no previous jobs are found.