airbyte.cloud.connections

Cloud Connections.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Cloud Connections."""
  3
  4from __future__ import annotations
  5
  6from typing import TYPE_CHECKING, cast
  7
  8from airbyte._util import api_util
  9from airbyte.cloud.connectors import CloudDestination, CloudSource
 10from airbyte.cloud.sync_results import SyncResult
 11
 12
 13if TYPE_CHECKING:
 14    from airbyte_api.models import ConnectionResponse, JobResponse
 15
 16    from airbyte.cloud.workspaces import CloudWorkspace
 17
 18
 19class CloudConnection:
 20    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 21
 22    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 23    """
 24
 25    def __init__(
 26        self,
 27        workspace: CloudWorkspace,
 28        connection_id: str,
 29        source: str | None = None,
 30        destination: str | None = None,
 31    ) -> None:
 32        """It is not recommended to create a `CloudConnection` object directly.
 33
 34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 35        """
 36        self.connection_id = connection_id
 37        """The ID of the connection."""
 38
 39        self.workspace = workspace
 40        """The workspace that the connection belongs to."""
 41
 42        self._source_id = source
 43        """The ID of the source."""
 44
 45        self._destination_id = destination
 46        """The ID of the destination."""
 47
 48        self._connection_info: ConnectionResponse | None = None
 49        """The connection info object. (Cached.)"""
 50
 51        self._cloud_source_object: CloudSource | None = None
 52        """The source object. (Cached.)"""
 53
 54        self._cloud_destination_object: CloudDestination | None = None
 55        """The destination object. (Cached.)"""
 56
 57    def _fetch_connection_info(self) -> ConnectionResponse:
 58        """Populate the connection with data from the API."""
 59        return api_util.get_connection(
 60            workspace_id=self.workspace.workspace_id,
 61            connection_id=self.connection_id,
 62            api_root=self.workspace.api_root,
 63            client_id=self.workspace.client_id,
 64            client_secret=self.workspace.client_secret,
 65        )
 66
 67    # Properties
 68
 69    @property
 70    def source_id(self) -> str:
 71        """The ID of the source."""
 72        if not self._source_id:
 73            if not self._connection_info:
 74                self._connection_info = self._fetch_connection_info()
 75
 76            self._source_id = self._connection_info.source_id
 77
 78        return cast("str", self._source_id)
 79
 80    @property
 81    def source(self) -> CloudSource:
 82        """Get the source object."""
 83        if self._cloud_source_object:
 84            return self._cloud_source_object
 85
 86        self._cloud_source_object = CloudSource(
 87            workspace=self.workspace,
 88            connector_id=self.source_id,
 89        )
 90        return self._cloud_source_object
 91
 92    @property
 93    def destination_id(self) -> str:
 94        """The ID of the destination."""
 95        if not self._destination_id:
 96            if not self._connection_info:
 97                self._connection_info = self._fetch_connection_info()
 98
 99            self._destination_id = self._connection_info.source_id
100
101        return cast("str", self._destination_id)
102
103    @property
104    def destination(self) -> CloudDestination:
105        """Get the destination object."""
106        if self._cloud_destination_object:
107            return self._cloud_destination_object
108
109        self._cloud_destination_object = CloudDestination(
110            workspace=self.workspace,
111            connector_id=self.destination_id,
112        )
113        return self._cloud_destination_object
114
115    @property
116    def stream_names(self) -> list[str]:
117        """The stream names."""
118        if not self._connection_info:
119            self._connection_info = self._fetch_connection_info()
120
121        return [stream.name for stream in self._connection_info.configurations.streams or []]
122
123    @property
124    def table_prefix(self) -> str:
125        """The table prefix."""
126        if not self._connection_info:
127            self._connection_info = self._fetch_connection_info()
128
129        return self._connection_info.prefix or ""
130
131    @property
132    def connection_url(self) -> str | None:
133        """The URL to the connection."""
134        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
135
136    @property
137    def job_history_url(self) -> str | None:
138        """The URL to the job history for the connection."""
139        return f"{self.connection_url}/job-history"
140
141    # Run Sync
142
143    def run_sync(
144        self,
145        *,
146        wait: bool = True,
147        wait_timeout: int = 300,
148    ) -> SyncResult:
149        """Run a sync."""
150        connection_response = api_util.run_connection(
151            connection_id=self.connection_id,
152            api_root=self.workspace.api_root,
153            workspace_id=self.workspace.workspace_id,
154            client_id=self.workspace.client_id,
155            client_secret=self.workspace.client_secret,
156        )
157        sync_result = SyncResult(
158            workspace=self.workspace,
159            connection=self,
160            job_id=connection_response.job_id,
161        )
162
163        if wait:
164            sync_result.wait_for_completion(
165                wait_timeout=wait_timeout,
166                raise_failure=True,
167                raise_timeout=True,
168            )
169
170        return sync_result
171
172    # Logs
173
174    def get_previous_sync_logs(
175        self,
176        *,
177        limit: int = 10,
178    ) -> list[SyncResult]:
179        """Get the previous sync logs for a connection."""
180        sync_logs: list[JobResponse] = api_util.get_job_logs(
181            connection_id=self.connection_id,
182            api_root=self.workspace.api_root,
183            workspace_id=self.workspace.workspace_id,
184            limit=limit,
185            client_id=self.workspace.client_id,
186            client_secret=self.workspace.client_secret,
187        )
188        return [
189            SyncResult(
190                workspace=self.workspace,
191                connection=self,
192                job_id=sync_log.job_id,
193                _latest_job_info=sync_log,
194            )
195            for sync_log in sync_logs
196        ]
197
198    def get_sync_result(
199        self,
200        job_id: int | None = None,
201    ) -> SyncResult | None:
202        """Get the sync result for the connection.
203
204        If `job_id` is not provided, the most recent sync job will be used.
205
206        Returns `None` if job_id is omitted and no previous jobs are found.
207        """
208        if job_id is None:
209            # Get the most recent sync job
210            results = self.get_previous_sync_logs(
211                limit=1,
212            )
213            if results:
214                return results[0]
215
216            return None
217
218        # Get the sync job by ID (lazy loaded)
219        return SyncResult(
220            workspace=self.workspace,
221            connection=self,
222            job_id=job_id,
223        )
224
225    # Deletions
226
227    def permanently_delete(
228        self,
229        *,
230        cascade_delete_source: bool = False,
231        cascade_delete_destination: bool = False,
232    ) -> None:
233        """Delete the connection.
234
235        Args:
236            cascade_delete_source: Whether to also delete the source.
237            cascade_delete_destination: Whether to also delete the destination.
238        """
239        self.workspace.permanently_delete_connection(self)
240
241        if cascade_delete_source:
242            self.workspace.permanently_delete_source(self.source_id)
243
244        if cascade_delete_destination:
245            self.workspace.permanently_delete_destination(self.destination_id)
class CloudConnection:
 20class CloudConnection:
 21    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 22
 23    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 24    """
 25
 26    def __init__(
 27        self,
 28        workspace: CloudWorkspace,
 29        connection_id: str,
 30        source: str | None = None,
 31        destination: str | None = None,
 32    ) -> None:
 33        """It is not recommended to create a `CloudConnection` object directly.
 34
 35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 36        """
 37        self.connection_id = connection_id
 38        """The ID of the connection."""
 39
 40        self.workspace = workspace
 41        """The workspace that the connection belongs to."""
 42
 43        self._source_id = source
 44        """The ID of the source."""
 45
 46        self._destination_id = destination
 47        """The ID of the destination."""
 48
 49        self._connection_info: ConnectionResponse | None = None
 50        """The connection info object. (Cached.)"""
 51
 52        self._cloud_source_object: CloudSource | None = None
 53        """The source object. (Cached.)"""
 54
 55        self._cloud_destination_object: CloudDestination | None = None
 56        """The destination object. (Cached.)"""
 57
 58    def _fetch_connection_info(self) -> ConnectionResponse:
 59        """Populate the connection with data from the API."""
 60        return api_util.get_connection(
 61            workspace_id=self.workspace.workspace_id,
 62            connection_id=self.connection_id,
 63            api_root=self.workspace.api_root,
 64            client_id=self.workspace.client_id,
 65            client_secret=self.workspace.client_secret,
 66        )
 67
 68    # Properties
 69
 70    @property
 71    def source_id(self) -> str:
 72        """The ID of the source."""
 73        if not self._source_id:
 74            if not self._connection_info:
 75                self._connection_info = self._fetch_connection_info()
 76
 77            self._source_id = self._connection_info.source_id
 78
 79        return cast("str", self._source_id)
 80
 81    @property
 82    def source(self) -> CloudSource:
 83        """Get the source object."""
 84        if self._cloud_source_object:
 85            return self._cloud_source_object
 86
 87        self._cloud_source_object = CloudSource(
 88            workspace=self.workspace,
 89            connector_id=self.source_id,
 90        )
 91        return self._cloud_source_object
 92
 93    @property
 94    def destination_id(self) -> str:
 95        """The ID of the destination."""
 96        if not self._destination_id:
 97            if not self._connection_info:
 98                self._connection_info = self._fetch_connection_info()
 99
100            self._destination_id = self._connection_info.source_id
101
102        return cast("str", self._destination_id)
103
104    @property
105    def destination(self) -> CloudDestination:
106        """Get the destination object."""
107        if self._cloud_destination_object:
108            return self._cloud_destination_object
109
110        self._cloud_destination_object = CloudDestination(
111            workspace=self.workspace,
112            connector_id=self.destination_id,
113        )
114        return self._cloud_destination_object
115
116    @property
117    def stream_names(self) -> list[str]:
118        """The stream names."""
119        if not self._connection_info:
120            self._connection_info = self._fetch_connection_info()
121
122        return [stream.name for stream in self._connection_info.configurations.streams or []]
123
124    @property
125    def table_prefix(self) -> str:
126        """The table prefix."""
127        if not self._connection_info:
128            self._connection_info = self._fetch_connection_info()
129
130        return self._connection_info.prefix or ""
131
132    @property
133    def connection_url(self) -> str | None:
134        """The URL to the connection."""
135        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
136
137    @property
138    def job_history_url(self) -> str | None:
139        """The URL to the job history for the connection."""
140        return f"{self.connection_url}/job-history"
141
142    # Run Sync
143
144    def run_sync(
145        self,
146        *,
147        wait: bool = True,
148        wait_timeout: int = 300,
149    ) -> SyncResult:
150        """Run a sync."""
151        connection_response = api_util.run_connection(
152            connection_id=self.connection_id,
153            api_root=self.workspace.api_root,
154            workspace_id=self.workspace.workspace_id,
155            client_id=self.workspace.client_id,
156            client_secret=self.workspace.client_secret,
157        )
158        sync_result = SyncResult(
159            workspace=self.workspace,
160            connection=self,
161            job_id=connection_response.job_id,
162        )
163
164        if wait:
165            sync_result.wait_for_completion(
166                wait_timeout=wait_timeout,
167                raise_failure=True,
168                raise_timeout=True,
169            )
170
171        return sync_result
172
173    # Logs
174
175    def get_previous_sync_logs(
176        self,
177        *,
178        limit: int = 10,
179    ) -> list[SyncResult]:
180        """Get the previous sync logs for a connection."""
181        sync_logs: list[JobResponse] = api_util.get_job_logs(
182            connection_id=self.connection_id,
183            api_root=self.workspace.api_root,
184            workspace_id=self.workspace.workspace_id,
185            limit=limit,
186            client_id=self.workspace.client_id,
187            client_secret=self.workspace.client_secret,
188        )
189        return [
190            SyncResult(
191                workspace=self.workspace,
192                connection=self,
193                job_id=sync_log.job_id,
194                _latest_job_info=sync_log,
195            )
196            for sync_log in sync_logs
197        ]
198
199    def get_sync_result(
200        self,
201        job_id: int | None = None,
202    ) -> SyncResult | None:
203        """Get the sync result for the connection.
204
205        If `job_id` is not provided, the most recent sync job will be used.
206
207        Returns `None` if job_id is omitted and no previous jobs are found.
208        """
209        if job_id is None:
210            # Get the most recent sync job
211            results = self.get_previous_sync_logs(
212                limit=1,
213            )
214            if results:
215                return results[0]
216
217            return None
218
219        # Get the sync job by ID (lazy loaded)
220        return SyncResult(
221            workspace=self.workspace,
222            connection=self,
223            job_id=job_id,
224        )
225
226    # Deletions
227
228    def permanently_delete(
229        self,
230        *,
231        cascade_delete_source: bool = False,
232        cascade_delete_destination: bool = False,
233    ) -> None:
234        """Delete the connection.
235
236        Args:
237            cascade_delete_source: Whether to also delete the source.
238            cascade_delete_destination: Whether to also delete the destination.
239        """
240        self.workspace.permanently_delete_connection(self)
241
242        if cascade_delete_source:
243            self.workspace.permanently_delete_source(self.source_id)
244
245        if cascade_delete_destination:
246            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: airbyte.cloud.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
26    def __init__(
27        self,
28        workspace: CloudWorkspace,
29        connection_id: str,
30        source: str | None = None,
31        destination: str | None = None,
32    ) -> None:
33        """It is not recommended to create a `CloudConnection` object directly.
34
35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
36        """
37        self.connection_id = connection_id
38        """The ID of the connection."""
39
40        self.workspace = workspace
41        """The workspace that the connection belongs to."""
42
43        self._source_id = source
44        """The ID of the source."""
45
46        self._destination_id = destination
47        """The ID of the destination."""
48
49        self._connection_info: ConnectionResponse | None = None
50        """The connection info object. (Cached.)"""
51
52        self._cloud_source_object: CloudSource | None = None
53        """The source object. (Cached.)"""
54
55        self._cloud_destination_object: CloudDestination | None = None
56        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

source_id: str
70    @property
71    def source_id(self) -> str:
72        """The ID of the source."""
73        if not self._source_id:
74            if not self._connection_info:
75                self._connection_info = self._fetch_connection_info()
76
77            self._source_id = self._connection_info.source_id
78
79        return cast("str", self._source_id)

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
81    @property
82    def source(self) -> CloudSource:
83        """Get the source object."""
84        if self._cloud_source_object:
85            return self._cloud_source_object
86
87        self._cloud_source_object = CloudSource(
88            workspace=self.workspace,
89            connector_id=self.source_id,
90        )
91        return self._cloud_source_object

Get the source object.

destination_id: str
 93    @property
 94    def destination_id(self) -> str:
 95        """The ID of the destination."""
 96        if not self._destination_id:
 97            if not self._connection_info:
 98                self._connection_info = self._fetch_connection_info()
 99
100            self._destination_id = self._connection_info.source_id
101
102        return cast("str", self._destination_id)

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
104    @property
105    def destination(self) -> CloudDestination:
106        """Get the destination object."""
107        if self._cloud_destination_object:
108            return self._cloud_destination_object
109
110        self._cloud_destination_object = CloudDestination(
111            workspace=self.workspace,
112            connector_id=self.destination_id,
113        )
114        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
116    @property
117    def stream_names(self) -> list[str]:
118        """The stream names."""
119        if not self._connection_info:
120            self._connection_info = self._fetch_connection_info()
121
122        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
124    @property
125    def table_prefix(self) -> str:
126        """The table prefix."""
127        if not self._connection_info:
128            self._connection_info = self._fetch_connection_info()
129
130        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
132    @property
133    def connection_url(self) -> str | None:
134        """The URL to the connection."""
135        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The URL to the connection.

job_history_url: str | None
137    @property
138    def job_history_url(self) -> str | None:
139        """The URL to the job history for the connection."""
140        return f"{self.connection_url}/job-history"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.SyncResult:
144    def run_sync(
145        self,
146        *,
147        wait: bool = True,
148        wait_timeout: int = 300,
149    ) -> SyncResult:
150        """Run a sync."""
151        connection_response = api_util.run_connection(
152            connection_id=self.connection_id,
153            api_root=self.workspace.api_root,
154            workspace_id=self.workspace.workspace_id,
155            client_id=self.workspace.client_id,
156            client_secret=self.workspace.client_secret,
157        )
158        sync_result = SyncResult(
159            workspace=self.workspace,
160            connection=self,
161            job_id=connection_response.job_id,
162        )
163
164        if wait:
165            sync_result.wait_for_completion(
166                wait_timeout=wait_timeout,
167                raise_failure=True,
168                raise_timeout=True,
169            )
170
171        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[airbyte.cloud.SyncResult]:
175    def get_previous_sync_logs(
176        self,
177        *,
178        limit: int = 10,
179    ) -> list[SyncResult]:
180        """Get the previous sync logs for a connection."""
181        sync_logs: list[JobResponse] = api_util.get_job_logs(
182            connection_id=self.connection_id,
183            api_root=self.workspace.api_root,
184            workspace_id=self.workspace.workspace_id,
185            limit=limit,
186            client_id=self.workspace.client_id,
187            client_secret=self.workspace.client_secret,
188        )
189        return [
190            SyncResult(
191                workspace=self.workspace,
192                connection=self,
193                job_id=sync_log.job_id,
194                _latest_job_info=sync_log,
195            )
196            for sync_log in sync_logs
197        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: int | None = None) -> airbyte.cloud.SyncResult | None:
199    def get_sync_result(
200        self,
201        job_id: int | None = None,
202    ) -> SyncResult | None:
203        """Get the sync result for the connection.
204
205        If `job_id` is not provided, the most recent sync job will be used.
206
207        Returns `None` if job_id is omitted and no previous jobs are found.
208        """
209        if job_id is None:
210            # Get the most recent sync job
211            results = self.get_previous_sync_logs(
212                limit=1,
213            )
214            if results:
215                return results[0]
216
217            return None
218
219        # Get the sync job by ID (lazy loaded)
220        return SyncResult(
221            workspace=self.workspace,
222            connection=self,
223            job_id=job_id,
224        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
228    def permanently_delete(
229        self,
230        *,
231        cascade_delete_source: bool = False,
232        cascade_delete_destination: bool = False,
233    ) -> None:
234        """Delete the connection.
235
236        Args:
237            cascade_delete_source: Whether to also delete the source.
238            cascade_delete_destination: Whether to also delete the destination.
239        """
240        self.workspace.permanently_delete_connection(self)
241
242        if cascade_delete_source:
243            self.workspace.permanently_delete_source(self.source_id)
244
245        if cascade_delete_destination:
246            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.