airbyte.cloud.connections

Cloud Connections.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Cloud Connections."""
  3
  4from __future__ import annotations
  5
  6from typing import TYPE_CHECKING, cast
  7
  8from airbyte._util import api_util
  9from airbyte.cloud.connectors import CloudDestination, CloudSource
 10from airbyte.cloud.sync_results import SyncResult
 11
 12
 13if TYPE_CHECKING:
 14    from airbyte_api.models import ConnectionResponse, JobResponse
 15
 16    from airbyte.cloud.workspaces import CloudWorkspace
 17
 18
 19class CloudConnection:
 20    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 21
 22    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 23    """
 24
 25    def __init__(
 26        self,
 27        workspace: CloudWorkspace,
 28        connection_id: str,
 29        source: str | None = None,
 30        destination: str | None = None,
 31    ) -> None:
 32        """It is not recommended to create a `CloudConnection` object directly.
 33
 34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 35        """
 36        self.connection_id = connection_id
 37        """The ID of the connection."""
 38
 39        self.workspace = workspace
 40        """The workspace that the connection belongs to."""
 41
 42        self._source_id = source
 43        """The ID of the source."""
 44
 45        self._destination_id = destination
 46        """The ID of the destination."""
 47
 48        self._connection_info: ConnectionResponse | None = None
 49        """The connection info object. (Cached.)"""
 50
 51        self._cloud_source_object: CloudSource | None = None
 52        """The source object. (Cached.)"""
 53
 54        self._cloud_destination_object: CloudDestination | None = None
 55        """The destination object. (Cached.)"""
 56
 57    def _fetch_connection_info(self) -> ConnectionResponse:
 58        """Populate the connection with data from the API."""
 59        return api_util.get_connection(
 60            workspace_id=self.workspace.workspace_id,
 61            connection_id=self.connection_id,
 62            api_root=self.workspace.api_root,
 63            client_id=self.workspace.client_id,
 64            client_secret=self.workspace.client_secret,
 65        )
 66
 67    # Properties
 68
 69    @property
 70    def source_id(self) -> str:
 71        """The ID of the source."""
 72        if not self._source_id:
 73            if not self._connection_info:
 74                self._connection_info = self._fetch_connection_info()
 75
 76            self._source_id = self._connection_info.source_id
 77
 78        return cast("str", self._source_id)
 79
 80    @property
 81    def source(self) -> CloudSource:
 82        """Get the source object."""
 83        if self._cloud_source_object:
 84            return self._cloud_source_object
 85
 86        self._cloud_source_object = CloudSource(
 87            workspace=self.workspace,
 88            connector_id=self.source_id,
 89        )
 90        return self._cloud_source_object
 91
 92    @property
 93    def destination_id(self) -> str:
 94        """The ID of the destination."""
 95        if not self._destination_id:
 96            if not self._connection_info:
 97                self._connection_info = self._fetch_connection_info()
 98
 99            self._destination_id = self._connection_info.source_id
100
101        return cast("str", self._destination_id)
102
103    @property
104    def destination(self) -> CloudDestination:
105        """Get the destination object."""
106        if self._cloud_destination_object:
107            return self._cloud_destination_object
108
109        self._cloud_destination_object = CloudDestination(
110            workspace=self.workspace,
111            connector_id=self.destination_id,
112        )
113        return self._cloud_destination_object
114
115    @property
116    def stream_names(self) -> list[str]:
117        """The stream names."""
118        if not self._connection_info:
119            self._connection_info = self._fetch_connection_info()
120
121        return [stream.name for stream in self._connection_info.configurations.streams or []]
122
123    @property
124    def table_prefix(self) -> str:
125        """The table prefix."""
126        if not self._connection_info:
127            self._connection_info = self._fetch_connection_info()
128
129        return self._connection_info.prefix or ""
130
131    @property
132    def connection_url(self) -> str | None:
133        """The web URL to the connection."""
134        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
135
136    @property
137    def job_history_url(self) -> str | None:
138        """The URL to the job history for the connection."""
139        return f"{self.connection_url}/timeline"
140
141    # Run Sync
142
143    def run_sync(
144        self,
145        *,
146        wait: bool = True,
147        wait_timeout: int = 300,
148    ) -> SyncResult:
149        """Run a sync."""
150        connection_response = api_util.run_connection(
151            connection_id=self.connection_id,
152            api_root=self.workspace.api_root,
153            workspace_id=self.workspace.workspace_id,
154            client_id=self.workspace.client_id,
155            client_secret=self.workspace.client_secret,
156        )
157        sync_result = SyncResult(
158            workspace=self.workspace,
159            connection=self,
160            job_id=connection_response.job_id,
161        )
162
163        if wait:
164            sync_result.wait_for_completion(
165                wait_timeout=wait_timeout,
166                raise_failure=True,
167                raise_timeout=True,
168            )
169
170        return sync_result
171
172    def __repr__(self) -> str:
173        """String representation of the connection."""
174        return (
175            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
176            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
177        )
178
179    # Logs
180
181    def get_previous_sync_logs(
182        self,
183        *,
184        limit: int = 10,
185    ) -> list[SyncResult]:
186        """Get the previous sync logs for a connection."""
187        sync_logs: list[JobResponse] = api_util.get_job_logs(
188            connection_id=self.connection_id,
189            api_root=self.workspace.api_root,
190            workspace_id=self.workspace.workspace_id,
191            limit=limit,
192            client_id=self.workspace.client_id,
193            client_secret=self.workspace.client_secret,
194        )
195        return [
196            SyncResult(
197                workspace=self.workspace,
198                connection=self,
199                job_id=sync_log.job_id,
200                _latest_job_info=sync_log,
201            )
202            for sync_log in sync_logs
203        ]
204
205    def get_sync_result(
206        self,
207        job_id: int | None = None,
208    ) -> SyncResult | None:
209        """Get the sync result for the connection.
210
211        If `job_id` is not provided, the most recent sync job will be used.
212
213        Returns `None` if job_id is omitted and no previous jobs are found.
214        """
215        if job_id is None:
216            # Get the most recent sync job
217            results = self.get_previous_sync_logs(
218                limit=1,
219            )
220            if results:
221                return results[0]
222
223            return None
224
225        # Get the sync job by ID (lazy loaded)
226        return SyncResult(
227            workspace=self.workspace,
228            connection=self,
229            job_id=job_id,
230        )
231
232    # Deletions
233
234    def permanently_delete(
235        self,
236        *,
237        cascade_delete_source: bool = False,
238        cascade_delete_destination: bool = False,
239    ) -> None:
240        """Delete the connection.
241
242        Args:
243            cascade_delete_source: Whether to also delete the source.
244            cascade_delete_destination: Whether to also delete the destination.
245        """
246        self.workspace.permanently_delete_connection(self)
247
248        if cascade_delete_source:
249            self.workspace.permanently_delete_source(self.source_id)
250
251        if cascade_delete_destination:
252            self.workspace.permanently_delete_destination(self.destination_id)
class CloudConnection:
 20class CloudConnection:
 21    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 22
 23    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 24    """
 25
 26    def __init__(
 27        self,
 28        workspace: CloudWorkspace,
 29        connection_id: str,
 30        source: str | None = None,
 31        destination: str | None = None,
 32    ) -> None:
 33        """It is not recommended to create a `CloudConnection` object directly.
 34
 35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 36        """
 37        self.connection_id = connection_id
 38        """The ID of the connection."""
 39
 40        self.workspace = workspace
 41        """The workspace that the connection belongs to."""
 42
 43        self._source_id = source
 44        """The ID of the source."""
 45
 46        self._destination_id = destination
 47        """The ID of the destination."""
 48
 49        self._connection_info: ConnectionResponse | None = None
 50        """The connection info object. (Cached.)"""
 51
 52        self._cloud_source_object: CloudSource | None = None
 53        """The source object. (Cached.)"""
 54
 55        self._cloud_destination_object: CloudDestination | None = None
 56        """The destination object. (Cached.)"""
 57
 58    def _fetch_connection_info(self) -> ConnectionResponse:
 59        """Populate the connection with data from the API."""
 60        return api_util.get_connection(
 61            workspace_id=self.workspace.workspace_id,
 62            connection_id=self.connection_id,
 63            api_root=self.workspace.api_root,
 64            client_id=self.workspace.client_id,
 65            client_secret=self.workspace.client_secret,
 66        )
 67
 68    # Properties
 69
 70    @property
 71    def source_id(self) -> str:
 72        """The ID of the source."""
 73        if not self._source_id:
 74            if not self._connection_info:
 75                self._connection_info = self._fetch_connection_info()
 76
 77            self._source_id = self._connection_info.source_id
 78
 79        return cast("str", self._source_id)
 80
 81    @property
 82    def source(self) -> CloudSource:
 83        """Get the source object."""
 84        if self._cloud_source_object:
 85            return self._cloud_source_object
 86
 87        self._cloud_source_object = CloudSource(
 88            workspace=self.workspace,
 89            connector_id=self.source_id,
 90        )
 91        return self._cloud_source_object
 92
 93    @property
 94    def destination_id(self) -> str:
 95        """The ID of the destination."""
 96        if not self._destination_id:
 97            if not self._connection_info:
 98                self._connection_info = self._fetch_connection_info()
 99
100            self._destination_id = self._connection_info.source_id
101
102        return cast("str", self._destination_id)
103
104    @property
105    def destination(self) -> CloudDestination:
106        """Get the destination object."""
107        if self._cloud_destination_object:
108            return self._cloud_destination_object
109
110        self._cloud_destination_object = CloudDestination(
111            workspace=self.workspace,
112            connector_id=self.destination_id,
113        )
114        return self._cloud_destination_object
115
116    @property
117    def stream_names(self) -> list[str]:
118        """The stream names."""
119        if not self._connection_info:
120            self._connection_info = self._fetch_connection_info()
121
122        return [stream.name for stream in self._connection_info.configurations.streams or []]
123
124    @property
125    def table_prefix(self) -> str:
126        """The table prefix."""
127        if not self._connection_info:
128            self._connection_info = self._fetch_connection_info()
129
130        return self._connection_info.prefix or ""
131
132    @property
133    def connection_url(self) -> str | None:
134        """The web URL to the connection."""
135        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
136
137    @property
138    def job_history_url(self) -> str | None:
139        """The URL to the job history for the connection."""
140        return f"{self.connection_url}/timeline"
141
142    # Run Sync
143
144    def run_sync(
145        self,
146        *,
147        wait: bool = True,
148        wait_timeout: int = 300,
149    ) -> SyncResult:
150        """Run a sync."""
151        connection_response = api_util.run_connection(
152            connection_id=self.connection_id,
153            api_root=self.workspace.api_root,
154            workspace_id=self.workspace.workspace_id,
155            client_id=self.workspace.client_id,
156            client_secret=self.workspace.client_secret,
157        )
158        sync_result = SyncResult(
159            workspace=self.workspace,
160            connection=self,
161            job_id=connection_response.job_id,
162        )
163
164        if wait:
165            sync_result.wait_for_completion(
166                wait_timeout=wait_timeout,
167                raise_failure=True,
168                raise_timeout=True,
169            )
170
171        return sync_result
172
173    def __repr__(self) -> str:
174        """String representation of the connection."""
175        return (
176            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
177            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
178        )
179
180    # Logs
181
182    def get_previous_sync_logs(
183        self,
184        *,
185        limit: int = 10,
186    ) -> list[SyncResult]:
187        """Get the previous sync logs for a connection."""
188        sync_logs: list[JobResponse] = api_util.get_job_logs(
189            connection_id=self.connection_id,
190            api_root=self.workspace.api_root,
191            workspace_id=self.workspace.workspace_id,
192            limit=limit,
193            client_id=self.workspace.client_id,
194            client_secret=self.workspace.client_secret,
195        )
196        return [
197            SyncResult(
198                workspace=self.workspace,
199                connection=self,
200                job_id=sync_log.job_id,
201                _latest_job_info=sync_log,
202            )
203            for sync_log in sync_logs
204        ]
205
206    def get_sync_result(
207        self,
208        job_id: int | None = None,
209    ) -> SyncResult | None:
210        """Get the sync result for the connection.
211
212        If `job_id` is not provided, the most recent sync job will be used.
213
214        Returns `None` if job_id is omitted and no previous jobs are found.
215        """
216        if job_id is None:
217            # Get the most recent sync job
218            results = self.get_previous_sync_logs(
219                limit=1,
220            )
221            if results:
222                return results[0]
223
224            return None
225
226        # Get the sync job by ID (lazy loaded)
227        return SyncResult(
228            workspace=self.workspace,
229            connection=self,
230            job_id=job_id,
231        )
232
233    # Deletions
234
235    def permanently_delete(
236        self,
237        *,
238        cascade_delete_source: bool = False,
239        cascade_delete_destination: bool = False,
240    ) -> None:
241        """Delete the connection.
242
243        Args:
244            cascade_delete_source: Whether to also delete the source.
245            cascade_delete_destination: Whether to also delete the destination.
246        """
247        self.workspace.permanently_delete_connection(self)
248
249        if cascade_delete_source:
250            self.workspace.permanently_delete_source(self.source_id)
251
252        if cascade_delete_destination:
253            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: airbyte.cloud.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
26    def __init__(
27        self,
28        workspace: CloudWorkspace,
29        connection_id: str,
30        source: str | None = None,
31        destination: str | None = None,
32    ) -> None:
33        """It is not recommended to create a `CloudConnection` object directly.
34
35        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
36        """
37        self.connection_id = connection_id
38        """The ID of the connection."""
39
40        self.workspace = workspace
41        """The workspace that the connection belongs to."""
42
43        self._source_id = source
44        """The ID of the source."""
45
46        self._destination_id = destination
47        """The ID of the destination."""
48
49        self._connection_info: ConnectionResponse | None = None
50        """The connection info object. (Cached.)"""
51
52        self._cloud_source_object: CloudSource | None = None
53        """The source object. (Cached.)"""
54
55        self._cloud_destination_object: CloudDestination | None = None
56        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

source_id: str
70    @property
71    def source_id(self) -> str:
72        """The ID of the source."""
73        if not self._source_id:
74            if not self._connection_info:
75                self._connection_info = self._fetch_connection_info()
76
77            self._source_id = self._connection_info.source_id
78
79        return cast("str", self._source_id)

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
81    @property
82    def source(self) -> CloudSource:
83        """Get the source object."""
84        if self._cloud_source_object:
85            return self._cloud_source_object
86
87        self._cloud_source_object = CloudSource(
88            workspace=self.workspace,
89            connector_id=self.source_id,
90        )
91        return self._cloud_source_object

Get the source object.

destination_id: str
 93    @property
 94    def destination_id(self) -> str:
 95        """The ID of the destination."""
 96        if not self._destination_id:
 97            if not self._connection_info:
 98                self._connection_info = self._fetch_connection_info()
 99
100            self._destination_id = self._connection_info.source_id
101
102        return cast("str", self._destination_id)

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
104    @property
105    def destination(self) -> CloudDestination:
106        """Get the destination object."""
107        if self._cloud_destination_object:
108            return self._cloud_destination_object
109
110        self._cloud_destination_object = CloudDestination(
111            workspace=self.workspace,
112            connector_id=self.destination_id,
113        )
114        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
116    @property
117    def stream_names(self) -> list[str]:
118        """The stream names."""
119        if not self._connection_info:
120            self._connection_info = self._fetch_connection_info()
121
122        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
124    @property
125    def table_prefix(self) -> str:
126        """The table prefix."""
127        if not self._connection_info:
128            self._connection_info = self._fetch_connection_info()
129
130        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
132    @property
133    def connection_url(self) -> str | None:
134        """The web URL to the connection."""
135        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
137    @property
138    def job_history_url(self) -> str | None:
139        """The URL to the job history for the connection."""
140        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.SyncResult:
144    def run_sync(
145        self,
146        *,
147        wait: bool = True,
148        wait_timeout: int = 300,
149    ) -> SyncResult:
150        """Run a sync."""
151        connection_response = api_util.run_connection(
152            connection_id=self.connection_id,
153            api_root=self.workspace.api_root,
154            workspace_id=self.workspace.workspace_id,
155            client_id=self.workspace.client_id,
156            client_secret=self.workspace.client_secret,
157        )
158        sync_result = SyncResult(
159            workspace=self.workspace,
160            connection=self,
161            job_id=connection_response.job_id,
162        )
163
164        if wait:
165            sync_result.wait_for_completion(
166                wait_timeout=wait_timeout,
167                raise_failure=True,
168                raise_timeout=True,
169            )
170
171        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[airbyte.cloud.SyncResult]:
182    def get_previous_sync_logs(
183        self,
184        *,
185        limit: int = 10,
186    ) -> list[SyncResult]:
187        """Get the previous sync logs for a connection."""
188        sync_logs: list[JobResponse] = api_util.get_job_logs(
189            connection_id=self.connection_id,
190            api_root=self.workspace.api_root,
191            workspace_id=self.workspace.workspace_id,
192            limit=limit,
193            client_id=self.workspace.client_id,
194            client_secret=self.workspace.client_secret,
195        )
196        return [
197            SyncResult(
198                workspace=self.workspace,
199                connection=self,
200                job_id=sync_log.job_id,
201                _latest_job_info=sync_log,
202            )
203            for sync_log in sync_logs
204        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: int | None = None) -> airbyte.cloud.SyncResult | None:
206    def get_sync_result(
207        self,
208        job_id: int | None = None,
209    ) -> SyncResult | None:
210        """Get the sync result for the connection.
211
212        If `job_id` is not provided, the most recent sync job will be used.
213
214        Returns `None` if job_id is omitted and no previous jobs are found.
215        """
216        if job_id is None:
217            # Get the most recent sync job
218            results = self.get_previous_sync_logs(
219                limit=1,
220            )
221            if results:
222                return results[0]
223
224            return None
225
226        # Get the sync job by ID (lazy loaded)
227        return SyncResult(
228            workspace=self.workspace,
229            connection=self,
230            job_id=job_id,
231        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
235    def permanently_delete(
236        self,
237        *,
238        cascade_delete_source: bool = False,
239        cascade_delete_destination: bool = False,
240    ) -> None:
241        """Delete the connection.
242
243        Args:
244            cascade_delete_source: Whether to also delete the source.
245            cascade_delete_destination: Whether to also delete the destination.
246        """
247        self.workspace.permanently_delete_connection(self)
248
249        if cascade_delete_source:
250            self.workspace.permanently_delete_source(self.source_id)
251
252        if cascade_delete_destination:
253            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.