airbyte.cloud.connections

Cloud Connections.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Cloud Connections."""
  3
  4from __future__ import annotations
  5
  6from typing import TYPE_CHECKING, cast
  7
  8from airbyte._util import api_util
  9from airbyte.cloud.sync_results import SyncResult
 10
 11
 12if TYPE_CHECKING:
 13    from airbyte_api.models import ConnectionResponse, JobResponse
 14
 15    from airbyte.cloud.workspaces import CloudWorkspace
 16
 17
 18class CloudConnection:
 19    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 20
 21    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 22    """
 23
 24    def __init__(
 25        self,
 26        workspace: CloudWorkspace,
 27        connection_id: str,
 28        source: str | None = None,
 29        destination: str | None = None,
 30    ) -> None:
 31        """It is not recommended to create a `CloudConnection` object directly.
 32
 33        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 34        """
 35        self.connection_id = connection_id
 36        """The ID of the connection."""
 37
 38        self.workspace = workspace
 39        """The workspace that the connection belongs to."""
 40
 41        self._source_id = source
 42        """The ID of the source."""
 43
 44        self._destination_id = destination
 45        """The ID of the destination."""
 46
 47        self._connection_info: ConnectionResponse | None = None
 48
 49    def _fetch_connection_info(self) -> ConnectionResponse:
 50        """Populate the connection with data from the API."""
 51        return api_util.get_connection(
 52            workspace_id=self.workspace.workspace_id,
 53            connection_id=self.connection_id,
 54            api_root=self.workspace.api_root,
 55            api_key=self.workspace.api_key,
 56        )
 57
 58    # Properties
 59
 60    @property
 61    def source_id(self) -> str:
 62        """The ID of the source."""
 63        if not self._source_id:
 64            if not self._connection_info:
 65                self._connection_info = self._fetch_connection_info()
 66
 67            self._source_id = self._connection_info.source_id
 68
 69        return cast(str, self._source_id)
 70
 71    @property
 72    def destination_id(self) -> str:
 73        """The ID of the destination."""
 74        if not self._destination_id:
 75            if not self._connection_info:
 76                self._connection_info = self._fetch_connection_info()
 77
 78            self._destination_id = self._connection_info.source_id
 79
 80        return cast(str, self._destination_id)
 81
 82    @property
 83    def stream_names(self) -> list[str]:
 84        """The stream names."""
 85        if not self._connection_info:
 86            self._connection_info = self._fetch_connection_info()
 87
 88        return [stream.name for stream in self._connection_info.configurations.streams]
 89
 90    @property
 91    def table_prefix(self) -> str:
 92        """The table prefix."""
 93        if not self._connection_info:
 94            self._connection_info = self._fetch_connection_info()
 95
 96        return self._connection_info.prefix
 97
 98    @property
 99    def connection_url(self) -> str | None:
100        """The URL to the connection."""
101        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
102
103    @property
104    def job_history_url(self) -> str | None:
105        """The URL to the job history for the connection."""
106        return f"{self.connection_url}/job-history"
107
108    # Run Sync
109
110    def run_sync(
111        self,
112        *,
113        wait: bool = True,
114        wait_timeout: int = 300,
115    ) -> SyncResult:
116        """Run a sync."""
117        connection_response = api_util.run_connection(
118            connection_id=self.connection_id,
119            api_root=self.workspace.api_root,
120            api_key=self.workspace.api_key,
121            workspace_id=self.workspace.workspace_id,
122        )
123        sync_result = SyncResult(
124            workspace=self.workspace,
125            connection=self,
126            job_id=connection_response.job_id,
127        )
128
129        if wait:
130            sync_result.wait_for_completion(
131                wait_timeout=wait_timeout,
132                raise_failure=True,
133                raise_timeout=True,
134            )
135
136        return sync_result
137
138    # Logs
139
140    def get_previous_sync_logs(
141        self,
142        *,
143        limit: int = 10,
144    ) -> list[SyncResult]:
145        """Get the previous sync logs for a connection."""
146        sync_logs: list[JobResponse] = api_util.get_job_logs(
147            connection_id=self.connection_id,
148            api_root=self.workspace.api_root,
149            api_key=self.workspace.api_key,
150            workspace_id=self.workspace.workspace_id,
151            limit=limit,
152        )
153        return [
154            SyncResult(
155                workspace=self.workspace,
156                connection=self,
157                job_id=sync_log.job_id,
158                _latest_job_info=sync_log,
159            )
160            for sync_log in sync_logs
161        ]
162
163    def get_sync_result(
164        self,
165        job_id: str | None = None,
166    ) -> SyncResult | None:
167        """Get the sync result for the connection.
168
169        If `job_id` is not provided, the most recent sync job will be used.
170
171        Returns `None` if job_id is omitted and no previous jobs are found.
172        """
173        if job_id is None:
174            # Get the most recent sync job
175            results = self.get_previous_sync_logs(
176                limit=1,
177            )
178            if results:
179                return results[0]
180
181            return None
182
183        # Get the sync job by ID (lazy loaded)
184        return SyncResult(
185            workspace=self.workspace,
186            connection=self,
187            job_id=job_id,
188        )
189
190    # Deletions
191
192    def _permanently_delete(
193        self,
194        *,
195        delete_source: bool = False,
196        delete_destination: bool = False,
197    ) -> None:
198        """Delete the connection.
199
200        Args:
201            delete_source: Whether to also delete the source.
202            delete_destination: Whether to also delete the destination.
203        """
204        self.workspace._permanently_delete_connection(  # noqa: SLF001  # Non-public API (for now)
205            connection=self
206        )
207
208        if delete_source:
209            self.workspace._permanently_delete_source(  # noqa: SLF001  # Non-public API (for now)
210                source=self.source_id
211            )
212
213        if delete_destination:
214            self.workspace._permanently_delete_destination(  # noqa: SLF001  # Non-public API
215                destination=self.destination_id,
216            )
class CloudConnection:
 19class CloudConnection:
 20    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 21
 22    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 23    """
 24
 25    def __init__(
 26        self,
 27        workspace: CloudWorkspace,
 28        connection_id: str,
 29        source: str | None = None,
 30        destination: str | None = None,
 31    ) -> None:
 32        """It is not recommended to create a `CloudConnection` object directly.
 33
 34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 35        """
 36        self.connection_id = connection_id
 37        """The ID of the connection."""
 38
 39        self.workspace = workspace
 40        """The workspace that the connection belongs to."""
 41
 42        self._source_id = source
 43        """The ID of the source."""
 44
 45        self._destination_id = destination
 46        """The ID of the destination."""
 47
 48        self._connection_info: ConnectionResponse | None = None
 49
 50    def _fetch_connection_info(self) -> ConnectionResponse:
 51        """Populate the connection with data from the API."""
 52        return api_util.get_connection(
 53            workspace_id=self.workspace.workspace_id,
 54            connection_id=self.connection_id,
 55            api_root=self.workspace.api_root,
 56            api_key=self.workspace.api_key,
 57        )
 58
 59    # Properties
 60
 61    @property
 62    def source_id(self) -> str:
 63        """The ID of the source."""
 64        if not self._source_id:
 65            if not self._connection_info:
 66                self._connection_info = self._fetch_connection_info()
 67
 68            self._source_id = self._connection_info.source_id
 69
 70        return cast(str, self._source_id)
 71
 72    @property
 73    def destination_id(self) -> str:
 74        """The ID of the destination."""
 75        if not self._destination_id:
 76            if not self._connection_info:
 77                self._connection_info = self._fetch_connection_info()
 78
 79            self._destination_id = self._connection_info.source_id
 80
 81        return cast(str, self._destination_id)
 82
 83    @property
 84    def stream_names(self) -> list[str]:
 85        """The stream names."""
 86        if not self._connection_info:
 87            self._connection_info = self._fetch_connection_info()
 88
 89        return [stream.name for stream in self._connection_info.configurations.streams]
 90
 91    @property
 92    def table_prefix(self) -> str:
 93        """The table prefix."""
 94        if not self._connection_info:
 95            self._connection_info = self._fetch_connection_info()
 96
 97        return self._connection_info.prefix
 98
 99    @property
100    def connection_url(self) -> str | None:
101        """The URL to the connection."""
102        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
103
104    @property
105    def job_history_url(self) -> str | None:
106        """The URL to the job history for the connection."""
107        return f"{self.connection_url}/job-history"
108
109    # Run Sync
110
111    def run_sync(
112        self,
113        *,
114        wait: bool = True,
115        wait_timeout: int = 300,
116    ) -> SyncResult:
117        """Run a sync."""
118        connection_response = api_util.run_connection(
119            connection_id=self.connection_id,
120            api_root=self.workspace.api_root,
121            api_key=self.workspace.api_key,
122            workspace_id=self.workspace.workspace_id,
123        )
124        sync_result = SyncResult(
125            workspace=self.workspace,
126            connection=self,
127            job_id=connection_response.job_id,
128        )
129
130        if wait:
131            sync_result.wait_for_completion(
132                wait_timeout=wait_timeout,
133                raise_failure=True,
134                raise_timeout=True,
135            )
136
137        return sync_result
138
139    # Logs
140
141    def get_previous_sync_logs(
142        self,
143        *,
144        limit: int = 10,
145    ) -> list[SyncResult]:
146        """Get the previous sync logs for a connection."""
147        sync_logs: list[JobResponse] = api_util.get_job_logs(
148            connection_id=self.connection_id,
149            api_root=self.workspace.api_root,
150            api_key=self.workspace.api_key,
151            workspace_id=self.workspace.workspace_id,
152            limit=limit,
153        )
154        return [
155            SyncResult(
156                workspace=self.workspace,
157                connection=self,
158                job_id=sync_log.job_id,
159                _latest_job_info=sync_log,
160            )
161            for sync_log in sync_logs
162        ]
163
164    def get_sync_result(
165        self,
166        job_id: str | None = None,
167    ) -> SyncResult | None:
168        """Get the sync result for the connection.
169
170        If `job_id` is not provided, the most recent sync job will be used.
171
172        Returns `None` if job_id is omitted and no previous jobs are found.
173        """
174        if job_id is None:
175            # Get the most recent sync job
176            results = self.get_previous_sync_logs(
177                limit=1,
178            )
179            if results:
180                return results[0]
181
182            return None
183
184        # Get the sync job by ID (lazy loaded)
185        return SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=job_id,
189        )
190
191    # Deletions
192
193    def _permanently_delete(
194        self,
195        *,
196        delete_source: bool = False,
197        delete_destination: bool = False,
198    ) -> None:
199        """Delete the connection.
200
201        Args:
202            delete_source: Whether to also delete the source.
203            delete_destination: Whether to also delete the destination.
204        """
205        self.workspace._permanently_delete_connection(  # noqa: SLF001  # Non-public API (for now)
206            connection=self
207        )
208
209        if delete_source:
210            self.workspace._permanently_delete_source(  # noqa: SLF001  # Non-public API (for now)
211                source=self.source_id
212            )
213
214        if delete_destination:
215            self.workspace._permanently_delete_destination(  # noqa: SLF001  # Non-public API
216                destination=self.destination_id,
217            )

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: airbyte.cloud.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
25    def __init__(
26        self,
27        workspace: CloudWorkspace,
28        connection_id: str,
29        source: str | None = None,
30        destination: str | None = None,
31    ) -> None:
32        """It is not recommended to create a `CloudConnection` object directly.
33
34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
35        """
36        self.connection_id = connection_id
37        """The ID of the connection."""
38
39        self.workspace = workspace
40        """The workspace that the connection belongs to."""
41
42        self._source_id = source
43        """The ID of the source."""
44
45        self._destination_id = destination
46        """The ID of the destination."""
47
48        self._connection_info: ConnectionResponse | None = None

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

source_id: str
61    @property
62    def source_id(self) -> str:
63        """The ID of the source."""
64        if not self._source_id:
65            if not self._connection_info:
66                self._connection_info = self._fetch_connection_info()
67
68            self._source_id = self._connection_info.source_id
69
70        return cast(str, self._source_id)

The ID of the source.

destination_id: str
72    @property
73    def destination_id(self) -> str:
74        """The ID of the destination."""
75        if not self._destination_id:
76            if not self._connection_info:
77                self._connection_info = self._fetch_connection_info()
78
79            self._destination_id = self._connection_info.source_id
80
81        return cast(str, self._destination_id)

The ID of the destination.

stream_names: list[str]
83    @property
84    def stream_names(self) -> list[str]:
85        """The stream names."""
86        if not self._connection_info:
87            self._connection_info = self._fetch_connection_info()
88
89        return [stream.name for stream in self._connection_info.configurations.streams]

The stream names.

table_prefix: str
91    @property
92    def table_prefix(self) -> str:
93        """The table prefix."""
94        if not self._connection_info:
95            self._connection_info = self._fetch_connection_info()
96
97        return self._connection_info.prefix

The table prefix.

connection_url: str | None
 99    @property
100    def connection_url(self) -> str | None:
101        """The URL to the connection."""
102        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The URL to the connection.

job_history_url: str | None
104    @property
105    def job_history_url(self) -> str | None:
106        """The URL to the job history for the connection."""
107        return f"{self.connection_url}/job-history"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.SyncResult:
111    def run_sync(
112        self,
113        *,
114        wait: bool = True,
115        wait_timeout: int = 300,
116    ) -> SyncResult:
117        """Run a sync."""
118        connection_response = api_util.run_connection(
119            connection_id=self.connection_id,
120            api_root=self.workspace.api_root,
121            api_key=self.workspace.api_key,
122            workspace_id=self.workspace.workspace_id,
123        )
124        sync_result = SyncResult(
125            workspace=self.workspace,
126            connection=self,
127            job_id=connection_response.job_id,
128        )
129
130        if wait:
131            sync_result.wait_for_completion(
132                wait_timeout=wait_timeout,
133                raise_failure=True,
134                raise_timeout=True,
135            )
136
137        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[airbyte.cloud.SyncResult]:
141    def get_previous_sync_logs(
142        self,
143        *,
144        limit: int = 10,
145    ) -> list[SyncResult]:
146        """Get the previous sync logs for a connection."""
147        sync_logs: list[JobResponse] = api_util.get_job_logs(
148            connection_id=self.connection_id,
149            api_root=self.workspace.api_root,
150            api_key=self.workspace.api_key,
151            workspace_id=self.workspace.workspace_id,
152            limit=limit,
153        )
154        return [
155            SyncResult(
156                workspace=self.workspace,
157                connection=self,
158                job_id=sync_log.job_id,
159                _latest_job_info=sync_log,
160            )
161            for sync_log in sync_logs
162        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: str | None = None) -> airbyte.cloud.SyncResult | None:
164    def get_sync_result(
165        self,
166        job_id: str | None = None,
167    ) -> SyncResult | None:
168        """Get the sync result for the connection.
169
170        If `job_id` is not provided, the most recent sync job will be used.
171
172        Returns `None` if job_id is omitted and no previous jobs are found.
173        """
174        if job_id is None:
175            # Get the most recent sync job
176            results = self.get_previous_sync_logs(
177                limit=1,
178            )
179            if results:
180                return results[0]
181
182            return None
183
184        # Get the sync job by ID (lazy loaded)
185        return SyncResult(
186            workspace=self.workspace,
187            connection=self,
188            job_id=job_id,
189        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.