airbyte.cloud.connections

Cloud Connections.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""Cloud Connections."""
  3
  4from __future__ import annotations
  5
  6from typing import TYPE_CHECKING, cast
  7
  8from airbyte._util import api_util
  9from airbyte.cloud.sync_results import SyncResult
 10
 11
 12if TYPE_CHECKING:
 13    from airbyte_api.models import ConnectionResponse, JobResponse
 14
 15    from airbyte.cloud.workspaces import CloudWorkspace
 16
 17
 18class CloudConnection:
 19    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 20
 21    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 22    """
 23
 24    def __init__(
 25        self,
 26        workspace: CloudWorkspace,
 27        connection_id: str,
 28        source: str | None = None,
 29        destination: str | None = None,
 30    ) -> None:
 31        """It is not recommended to create a `CloudConnection` object directly.
 32
 33        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 34        """
 35        self.connection_id = connection_id
 36        """The ID of the connection."""
 37
 38        self.workspace = workspace
 39        """The workspace that the connection belongs to."""
 40
 41        self._source_id = source
 42        """The ID of the source."""
 43
 44        self._destination_id = destination
 45        """The ID of the destination."""
 46
 47        self._connection_info: ConnectionResponse | None = None
 48
 49    def _fetch_connection_info(self) -> ConnectionResponse:
 50        """Populate the connection with data from the API."""
 51        return api_util.get_connection(
 52            workspace_id=self.workspace.workspace_id,
 53            connection_id=self.connection_id,
 54            api_root=self.workspace.api_root,
 55            api_key=self.workspace.api_key,
 56        )
 57
 58    # Properties
 59
 60    @property
 61    def source_id(self) -> str:
 62        """The ID of the source."""
 63        if not self._source_id:
 64            if not self._connection_info:
 65                self._connection_info = self._fetch_connection_info()
 66
 67            self._source_id = self._connection_info.source_id
 68
 69        return cast(str, self._source_id)
 70
 71    @property
 72    def destination_id(self) -> str:
 73        """The ID of the destination."""
 74        if not self._destination_id:
 75            if not self._connection_info:
 76                self._connection_info = self._fetch_connection_info()
 77
 78            self._destination_id = self._connection_info.source_id
 79
 80        return cast(str, self._destination_id)
 81
 82    @property
 83    def stream_names(self) -> list[str]:
 84        """The stream names."""
 85        if not self._connection_info:
 86            self._connection_info = self._fetch_connection_info()
 87
 88        return [stream.name for stream in self._connection_info.configurations.streams]
 89
 90    @property
 91    def table_prefix(self) -> str:
 92        """The table prefix."""
 93        if not self._connection_info:
 94            self._connection_info = self._fetch_connection_info()
 95
 96        return self._connection_info.prefix
 97
 98    @property
 99    def connection_url(self) -> str | None:
100        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
101
102    @property
103    def job_history_url(self) -> str | None:
104        return f"{self.connection_url}/job-history"
105
106    # Run Sync
107
108    def run_sync(
109        self,
110        *,
111        wait: bool = True,
112        wait_timeout: int = 300,
113    ) -> SyncResult:
114        """Run a sync."""
115        connection_response = api_util.run_connection(
116            connection_id=self.connection_id,
117            api_root=self.workspace.api_root,
118            api_key=self.workspace.api_key,
119            workspace_id=self.workspace.workspace_id,
120        )
121        sync_result = SyncResult(
122            workspace=self.workspace,
123            connection=self,
124            job_id=connection_response.job_id,
125        )
126
127        if wait:
128            sync_result.wait_for_completion(
129                wait_timeout=wait_timeout,
130                raise_failure=True,
131                raise_timeout=True,
132            )
133
134        return sync_result
135
136    # Logs
137
138    def get_previous_sync_logs(
139        self,
140        *,
141        limit: int = 10,
142    ) -> list[SyncResult]:
143        """Get the previous sync logs for a connection."""
144        sync_logs: list[JobResponse] = api_util.get_job_logs(
145            connection_id=self.connection_id,
146            api_root=self.workspace.api_root,
147            api_key=self.workspace.api_key,
148            workspace_id=self.workspace.workspace_id,
149            limit=limit,
150        )
151        return [
152            SyncResult(
153                workspace=self.workspace,
154                connection=self,
155                job_id=sync_log.job_id,
156                _latest_job_info=sync_log,
157            )
158            for sync_log in sync_logs
159        ]
160
161    def get_sync_result(
162        self,
163        job_id: str | None = None,
164    ) -> SyncResult | None:
165        """Get the sync result for the connection.
166
167        If `job_id` is not provided, the most recent sync job will be used.
168
169        Returns `None` if job_id is omitted and no previous jobs are found.
170        """
171        if job_id is None:
172            # Get the most recent sync job
173            results = self.get_previous_sync_logs(
174                limit=1,
175            )
176            if results:
177                return results[0]
178
179            return None
180
181        # Get the sync job by ID (lazy loaded)
182        return SyncResult(
183            workspace=self.workspace,
184            connection=self,
185            job_id=job_id,
186        )
187
188    # Deletions
189
190    def _permanently_delete(
191        self,
192        *,
193        delete_source: bool = False,
194        delete_destination: bool = False,
195    ) -> None:
196        """Delete the connection.
197
198        Args:
199            delete_source: Whether to also delete the source.
200            delete_destination: Whether to also delete the destination.
201        """
202        self.workspace._permanently_delete_connection(  # noqa: SLF001  # Non-public API (for now)
203            connection=self
204        )
205
206        if delete_source:
207            self.workspace._permanently_delete_source(  # noqa: SLF001  # Non-public API (for now)
208                source=self.source_id
209            )
210
211        if delete_destination:
212            self.workspace._permanently_delete_destination(  # noqa: SLF001  # Non-public API
213                destination=self.destination_id,
214            )
class CloudConnection:
 19class CloudConnection:
 20    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 21
 22    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 23    """
 24
 25    def __init__(
 26        self,
 27        workspace: CloudWorkspace,
 28        connection_id: str,
 29        source: str | None = None,
 30        destination: str | None = None,
 31    ) -> None:
 32        """It is not recommended to create a `CloudConnection` object directly.
 33
 34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 35        """
 36        self.connection_id = connection_id
 37        """The ID of the connection."""
 38
 39        self.workspace = workspace
 40        """The workspace that the connection belongs to."""
 41
 42        self._source_id = source
 43        """The ID of the source."""
 44
 45        self._destination_id = destination
 46        """The ID of the destination."""
 47
 48        self._connection_info: ConnectionResponse | None = None
 49
 50    def _fetch_connection_info(self) -> ConnectionResponse:
 51        """Populate the connection with data from the API."""
 52        return api_util.get_connection(
 53            workspace_id=self.workspace.workspace_id,
 54            connection_id=self.connection_id,
 55            api_root=self.workspace.api_root,
 56            api_key=self.workspace.api_key,
 57        )
 58
 59    # Properties
 60
 61    @property
 62    def source_id(self) -> str:
 63        """The ID of the source."""
 64        if not self._source_id:
 65            if not self._connection_info:
 66                self._connection_info = self._fetch_connection_info()
 67
 68            self._source_id = self._connection_info.source_id
 69
 70        return cast(str, self._source_id)
 71
 72    @property
 73    def destination_id(self) -> str:
 74        """The ID of the destination."""
 75        if not self._destination_id:
 76            if not self._connection_info:
 77                self._connection_info = self._fetch_connection_info()
 78
 79            self._destination_id = self._connection_info.source_id
 80
 81        return cast(str, self._destination_id)
 82
 83    @property
 84    def stream_names(self) -> list[str]:
 85        """The stream names."""
 86        if not self._connection_info:
 87            self._connection_info = self._fetch_connection_info()
 88
 89        return [stream.name for stream in self._connection_info.configurations.streams]
 90
 91    @property
 92    def table_prefix(self) -> str:
 93        """The table prefix."""
 94        if not self._connection_info:
 95            self._connection_info = self._fetch_connection_info()
 96
 97        return self._connection_info.prefix
 98
 99    @property
100    def connection_url(self) -> str | None:
101        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
102
103    @property
104    def job_history_url(self) -> str | None:
105        return f"{self.connection_url}/job-history"
106
107    # Run Sync
108
109    def run_sync(
110        self,
111        *,
112        wait: bool = True,
113        wait_timeout: int = 300,
114    ) -> SyncResult:
115        """Run a sync."""
116        connection_response = api_util.run_connection(
117            connection_id=self.connection_id,
118            api_root=self.workspace.api_root,
119            api_key=self.workspace.api_key,
120            workspace_id=self.workspace.workspace_id,
121        )
122        sync_result = SyncResult(
123            workspace=self.workspace,
124            connection=self,
125            job_id=connection_response.job_id,
126        )
127
128        if wait:
129            sync_result.wait_for_completion(
130                wait_timeout=wait_timeout,
131                raise_failure=True,
132                raise_timeout=True,
133            )
134
135        return sync_result
136
137    # Logs
138
139    def get_previous_sync_logs(
140        self,
141        *,
142        limit: int = 10,
143    ) -> list[SyncResult]:
144        """Get the previous sync logs for a connection."""
145        sync_logs: list[JobResponse] = api_util.get_job_logs(
146            connection_id=self.connection_id,
147            api_root=self.workspace.api_root,
148            api_key=self.workspace.api_key,
149            workspace_id=self.workspace.workspace_id,
150            limit=limit,
151        )
152        return [
153            SyncResult(
154                workspace=self.workspace,
155                connection=self,
156                job_id=sync_log.job_id,
157                _latest_job_info=sync_log,
158            )
159            for sync_log in sync_logs
160        ]
161
162    def get_sync_result(
163        self,
164        job_id: str | None = None,
165    ) -> SyncResult | None:
166        """Get the sync result for the connection.
167
168        If `job_id` is not provided, the most recent sync job will be used.
169
170        Returns `None` if job_id is omitted and no previous jobs are found.
171        """
172        if job_id is None:
173            # Get the most recent sync job
174            results = self.get_previous_sync_logs(
175                limit=1,
176            )
177            if results:
178                return results[0]
179
180            return None
181
182        # Get the sync job by ID (lazy loaded)
183        return SyncResult(
184            workspace=self.workspace,
185            connection=self,
186            job_id=job_id,
187        )
188
189    # Deletions
190
191    def _permanently_delete(
192        self,
193        *,
194        delete_source: bool = False,
195        delete_destination: bool = False,
196    ) -> None:
197        """Delete the connection.
198
199        Args:
200            delete_source: Whether to also delete the source.
201            delete_destination: Whether to also delete the destination.
202        """
203        self.workspace._permanently_delete_connection(  # noqa: SLF001  # Non-public API (for now)
204            connection=self
205        )
206
207        if delete_source:
208            self.workspace._permanently_delete_source(  # noqa: SLF001  # Non-public API (for now)
209                source=self.source_id
210            )
211
212        if delete_destination:
213            self.workspace._permanently_delete_destination(  # noqa: SLF001  # Non-public API
214                destination=self.destination_id,
215            )

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: airbyte.cloud.workspaces.CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
25    def __init__(
26        self,
27        workspace: CloudWorkspace,
28        connection_id: str,
29        source: str | None = None,
30        destination: str | None = None,
31    ) -> None:
32        """It is not recommended to create a `CloudConnection` object directly.
33
34        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
35        """
36        self.connection_id = connection_id
37        """The ID of the connection."""
38
39        self.workspace = workspace
40        """The workspace that the connection belongs to."""
41
42        self._source_id = source
43        """The ID of the source."""
44
45        self._destination_id = destination
46        """The ID of the destination."""
47
48        self._connection_info: ConnectionResponse | None = None

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

source_id: str
61    @property
62    def source_id(self) -> str:
63        """The ID of the source."""
64        if not self._source_id:
65            if not self._connection_info:
66                self._connection_info = self._fetch_connection_info()
67
68            self._source_id = self._connection_info.source_id
69
70        return cast(str, self._source_id)

The ID of the source.

destination_id: str
72    @property
73    def destination_id(self) -> str:
74        """The ID of the destination."""
75        if not self._destination_id:
76            if not self._connection_info:
77                self._connection_info = self._fetch_connection_info()
78
79            self._destination_id = self._connection_info.source_id
80
81        return cast(str, self._destination_id)

The ID of the destination.

stream_names: list[str]
83    @property
84    def stream_names(self) -> list[str]:
85        """The stream names."""
86        if not self._connection_info:
87            self._connection_info = self._fetch_connection_info()
88
89        return [stream.name for stream in self._connection_info.configurations.streams]

The stream names.

table_prefix: str
91    @property
92    def table_prefix(self) -> str:
93        """The table prefix."""
94        if not self._connection_info:
95            self._connection_info = self._fetch_connection_info()
96
97        return self._connection_info.prefix

The table prefix.

connection_url: str | None
 99    @property
100    def connection_url(self) -> str | None:
101        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
job_history_url: str | None
103    @property
104    def job_history_url(self) -> str | None:
105        return f"{self.connection_url}/job-history"
def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> airbyte.cloud.sync_results.SyncResult:
109    def run_sync(
110        self,
111        *,
112        wait: bool = True,
113        wait_timeout: int = 300,
114    ) -> SyncResult:
115        """Run a sync."""
116        connection_response = api_util.run_connection(
117            connection_id=self.connection_id,
118            api_root=self.workspace.api_root,
119            api_key=self.workspace.api_key,
120            workspace_id=self.workspace.workspace_id,
121        )
122        sync_result = SyncResult(
123            workspace=self.workspace,
124            connection=self,
125            job_id=connection_response.job_id,
126        )
127
128        if wait:
129            sync_result.wait_for_completion(
130                wait_timeout=wait_timeout,
131                raise_failure=True,
132                raise_timeout=True,
133            )
134
135        return sync_result

Run a sync.

def get_previous_sync_logs(self, *, limit: int = 10) -> list[airbyte.cloud.sync_results.SyncResult]:
139    def get_previous_sync_logs(
140        self,
141        *,
142        limit: int = 10,
143    ) -> list[SyncResult]:
144        """Get the previous sync logs for a connection."""
145        sync_logs: list[JobResponse] = api_util.get_job_logs(
146            connection_id=self.connection_id,
147            api_root=self.workspace.api_root,
148            api_key=self.workspace.api_key,
149            workspace_id=self.workspace.workspace_id,
150            limit=limit,
151        )
152        return [
153            SyncResult(
154                workspace=self.workspace,
155                connection=self,
156                job_id=sync_log.job_id,
157                _latest_job_info=sync_log,
158            )
159            for sync_log in sync_logs
160        ]

Get the previous sync logs for a connection.

def get_sync_result( self, job_id: str | None = None) -> airbyte.cloud.sync_results.SyncResult | None:
162    def get_sync_result(
163        self,
164        job_id: str | None = None,
165    ) -> SyncResult | None:
166        """Get the sync result for the connection.
167
168        If `job_id` is not provided, the most recent sync job will be used.
169
170        Returns `None` if job_id is omitted and no previous jobs are found.
171        """
172        if job_id is None:
173            # Get the most recent sync job
174            results = self.get_previous_sync_logs(
175                limit=1,
176            )
177            if results:
178                return results[0]
179
180            return None
181
182        # Get the sync job by ID (lazy loaded)
183        return SyncResult(
184            workspace=self.workspace,
185            connection=self,
186            job_id=job_id,
187        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.