airbyte.cloud.client

PyAirbyte Cloud client.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte Cloud client."""
  3
  4from __future__ import annotations
  5
  6from dataclasses import dataclass
  7from typing import TYPE_CHECKING, overload
  8
  9from airbyte import exceptions as exc
 10from airbyte._util import api_util
 11from airbyte.cloud._credentials import _AirbyteCredentials
 12from airbyte.cloud.models import CloudWorkspaceInfo
 13from airbyte.cloud.organizations import CloudOrganization
 14from airbyte.cloud.workspaces import CloudWorkspace
 15from airbyte.exceptions import AirbyteMissingResourceError
 16
 17
 18if TYPE_CHECKING:
 19    from collections.abc import Callable
 20
 21    from airbyte.secrets.base import SecretString
 22
 23
 24@dataclass(init=False, kw_only=True)
 25class CloudClient:
 26    """Authenticated client for Airbyte Cloud and self-managed Airbyte APIs."""
 27
 28    _credentials: _AirbyteCredentials
 29
 30    def __init__(
 31        self,
 32        *,
 33        client_id: str | SecretString | None = None,
 34        client_secret: str | SecretString | None = None,
 35        bearer_token: str | SecretString | None = None,
 36        public_api_root: str | None = None,
 37        config_api_root: str | None = None,
 38        workspace_id: str | None = None,
 39        organization_id: str | None = None,
 40    ) -> None:
 41        """Initialize a `CloudClient` from explicit auth values."""
 42        self._credentials = _AirbyteCredentials.from_auth(
 43            client_id=client_id,
 44            client_secret=client_secret,
 45            bearer_token=bearer_token,
 46            public_api_root=public_api_root,
 47            config_api_root=config_api_root,
 48            workspace_id=workspace_id,
 49            organization_id=organization_id,
 50            env_vars=False,
 51        )
 52
 53    @property
 54    def client_id(self) -> SecretString | None:
 55        """OAuth client ID used for authentication."""
 56        return self._credentials.client_id
 57
 58    @property
 59    def client_secret(self) -> SecretString | None:
 60        """OAuth client secret used for authentication."""
 61        return self._credentials.client_secret
 62
 63    @property
 64    def bearer_token(self) -> SecretString | None:
 65        """Bearer token used for authentication."""
 66        return self._credentials.bearer_token
 67
 68    @property
 69    def public_api_root(self) -> str:
 70        """Airbyte Public API root."""
 71        return self._credentials.public_api_root
 72
 73    @property
 74    def config_api_root(self) -> str | None:
 75        """Airbyte Config API root."""
 76        return self._credentials.config_api_root
 77
 78    @property
 79    def organization_id(self) -> str | None:
 80        """Default organization ID for organization-scoped operations."""
 81        return self._credentials.organization_id
 82
 83    @classmethod
 84    def from_auth(
 85        cls,
 86        *,
 87        env_vars: bool = False,
 88        organization_id: str | None = None,
 89        client_id: str | SecretString | None = None,
 90        client_secret: str | SecretString | None = None,
 91        bearer_token: str | SecretString | None = None,
 92        public_api_root: str | None = None,
 93        config_api_root: str | None = None,
 94    ) -> CloudClient:
 95        """Create a client from explicit inputs and optionally environment variables.
 96
 97        When `env_vars` is True, environment variables are checked as a fallback
 98        after any explicitly provided values.
 99        """
100        credentials = _AirbyteCredentials.from_auth(
101            organization_id=organization_id,
102            client_id=client_id,
103            client_secret=client_secret,
104            bearer_token=bearer_token,
105            public_api_root=public_api_root,
106            config_api_root=config_api_root,
107            env_vars=env_vars,
108        )
109        return cls._from_credentials(credentials)
110
111    @classmethod
112    def _from_credentials(cls, credentials: _AirbyteCredentials) -> CloudClient:
113        """Create a client from resolved Cloud credentials."""
114        return cls(
115            client_id=credentials.client_id,
116            client_secret=credentials.client_secret,
117            bearer_token=credentials.bearer_token,
118            public_api_root=credentials.public_api_root,
119            config_api_root=credentials.config_api_root,
120            workspace_id=credentials.workspace_id,
121            organization_id=credentials.organization_id,
122        )
123
124    def get_workspace(self, workspace_id: str | None = None) -> CloudWorkspace:
125        """Create a `CloudWorkspace` using this client's credentials."""
126        resolved_workspace_id = workspace_id or self._credentials.workspace_id
127        if not resolved_workspace_id:
128            raise exc.PyAirbyteInputError(
129                message="Workspace ID is required.",
130                guidance="Provide a workspace ID.",
131            )
132
133        credentials = self._credentials.with_workspace_id(resolved_workspace_id)
134        return CloudWorkspace(
135            workspace_id=credentials.workspace_id,
136            client_id=credentials.client_id,
137            client_secret=credentials.client_secret,
138            bearer_token=credentials.bearer_token,
139            api_root=credentials.public_api_root,
140            config_api_root=credentials.config_api_root,
141        )
142
143    def create_workspace(
144        self,
145        *,
146        name: str,
147        organization_id: str | None = None,
148        region_id: str | None = None,
149    ) -> CloudWorkspaceInfo:
150        """Create an Airbyte workspace."""
151        resolved_organization_id = organization_id or self.organization_id
152        workspace = api_util.create_workspace(
153            name=name,
154            organization_id=resolved_organization_id,
155            region_id=region_id,
156            api_root=self.public_api_root,
157            client_id=self.client_id,
158            client_secret=self.client_secret,
159            bearer_token=self.bearer_token,
160        )
161        return CloudWorkspaceInfo.from_api_response(workspace)
162
163    def rename_workspace(
164        self,
165        workspace_id: str,
166        *,
167        name: str,
168    ) -> CloudWorkspaceInfo:
169        """Rename an Airbyte workspace."""
170        workspace = api_util.rename_workspace(
171            workspace_id=workspace_id,
172            name=name,
173            api_root=self.public_api_root,
174            client_id=self.client_id,
175            client_secret=self.client_secret,
176            bearer_token=self.bearer_token,
177        )
178        return CloudWorkspaceInfo.from_api_response(workspace)
179
180    def permanently_delete_workspace(
181        self,
182        workspace_id: str,
183        *,
184        workspace_name: str | None = None,
185        safe_mode: bool = True,
186    ) -> None:
187        """Permanently delete an Airbyte workspace if it has no connections.
188
189        When `safe_mode` is enabled, the workspace name must contain `delete-me`
190        or `deleteme`. This also checks for existing connections before deleting
191        and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty.
192        """
193        api_util.permanently_delete_workspace(
194            workspace_id=workspace_id,
195            workspace_name=workspace_name,
196            api_root=self.public_api_root,
197            client_id=self.client_id,
198            client_secret=self.client_secret,
199            bearer_token=self.bearer_token,
200            safe_mode=safe_mode,
201        )
202
203    @overload
204    def list_workspaces(
205        self,
206        name: str | None = None,
207        *,
208        organization_id: None = None,
209        name_contains: str | None = None,
210        name_filter: Callable[[str], bool] | None = None,
211        limit: int | None = None,
212    ) -> list[CloudWorkspaceInfo]:
213        raise NotImplementedError
214
215    @overload
216    def list_workspaces(
217        self,
218        name: str | None = None,
219        *,
220        organization_id: str,
221        name_contains: str | None = None,
222        name_filter: Callable[[str], bool] | None = None,
223        limit: int | None = None,
224    ) -> list[CloudWorkspaceInfo]:
225        raise NotImplementedError
226
227    def list_workspaces(
228        self,
229        name: str | None = None,
230        *,
231        organization_id: str | None = None,
232        name_contains: str | None = None,
233        name_filter: Callable[[str], bool] | None = None,
234        limit: int | None = None,
235    ) -> list[CloudWorkspaceInfo]:
236        """List workspaces available to this client."""
237        if organization_id is not None or self.organization_id is not None:
238            resolved_organization_id = organization_id or self.organization_id
239            if not resolved_organization_id:
240                raise exc.PyAirbyteInputError(
241                    message="Organization ID is required.",
242                    guidance="Provide an organization ID.",
243                )
244            workspaces = api_util.list_workspaces_in_organization(
245                organization_id=resolved_organization_id,
246                api_root=self.public_api_root,
247                config_api_root=self.config_api_root,
248                client_id=self.client_id,
249                client_secret=self.client_secret,
250                bearer_token=self.bearer_token,
251                name_contains=name_contains or name,
252                limit=None if name_filter is not None else limit,
253            )
254            workspace_infos = [
255                CloudWorkspaceInfo.from_mapping(workspace) for workspace in workspaces
256            ]
257            if name_filter is not None:
258                workspace_infos = [
259                    workspace for workspace in workspace_infos if name_filter(workspace.name)
260                ]
261                if limit is not None:
262                    workspace_infos = workspace_infos[:limit]
263            return workspace_infos
264        if name_contains is not None:
265            if name_filter is not None:
266                raise exc.PyAirbyteInputError(
267                    message="You can provide name_contains or name_filter, but not both."
268                )
269            name_substring = name_contains
270
271            def name_filter(workspace_name: str) -> bool:
272                return name_substring in workspace_name
273
274        return [
275            CloudWorkspaceInfo.from_api_response(workspace)
276            for workspace in api_util.list_workspaces(
277                workspace_id="",
278                api_root=self.public_api_root,
279                name=name,
280                name_filter=name_filter,
281                client_id=self.client_id,
282                client_secret=self.client_secret,
283                bearer_token=self.bearer_token,
284                limit=limit,
285            )
286        ]
287
288    def get_organization(
289        self,
290        organization_id: str | None = None,
291        *,
292        organization_name: str | None = None,
293    ) -> CloudOrganization:
294        """Resolve an organization by ID or exact name."""
295        resolved_organization_id = organization_id or self.organization_id
296        if resolved_organization_id and organization_name:
297            raise exc.PyAirbyteInputError(
298                message="Provide either organization ID or organization name."
299            )
300        if not resolved_organization_id and not organization_name:
301            raise exc.PyAirbyteInputError(
302                message="Organization ID or organization name is required."
303            )
304
305        organizations = api_util.list_organizations_for_user(
306            api_root=self.public_api_root,
307            client_id=self.client_id,
308            client_secret=self.client_secret,
309            bearer_token=self.bearer_token,
310        )
311        if resolved_organization_id:
312            matching_organizations = [
313                organization
314                for organization in organizations
315                if organization.organization_id == resolved_organization_id
316            ]
317        else:
318            matching_organizations = [
319                organization
320                for organization in organizations
321                if organization.organization_name == organization_name
322            ]
323
324        if not matching_organizations:
325            raise AirbyteMissingResourceError(
326                resource_type="organization",
327                resource_name_or_id=resolved_organization_id or organization_name,
328            )
329        if len(matching_organizations) > 1:
330            raise exc.PyAirbyteInputError(
331                message="Organization name matches multiple organizations.",
332                context={"organization_name": organization_name},
333            )
334
335        organization = matching_organizations[0]
336
337        organization_credentials = self._credentials.with_organization_id(
338            organization.organization_id
339        )
340        return CloudOrganization(
341            organization_id=organization.organization_id,
342            organization_name=organization.organization_name,
343            email=organization.email,
344            client_id=organization_credentials.client_id,
345            client_secret=organization_credentials.client_secret,
346            bearer_token=organization_credentials.bearer_token,
347            public_api_root=organization_credentials.public_api_root,
348            config_api_root=organization_credentials.config_api_root,
349        )
@dataclass(init=False, kw_only=True)
class CloudClient:
 25@dataclass(init=False, kw_only=True)
 26class CloudClient:
 27    """Authenticated client for Airbyte Cloud and self-managed Airbyte APIs."""
 28
 29    _credentials: _AirbyteCredentials
 30
 31    def __init__(
 32        self,
 33        *,
 34        client_id: str | SecretString | None = None,
 35        client_secret: str | SecretString | None = None,
 36        bearer_token: str | SecretString | None = None,
 37        public_api_root: str | None = None,
 38        config_api_root: str | None = None,
 39        workspace_id: str | None = None,
 40        organization_id: str | None = None,
 41    ) -> None:
 42        """Initialize a `CloudClient` from explicit auth values."""
 43        self._credentials = _AirbyteCredentials.from_auth(
 44            client_id=client_id,
 45            client_secret=client_secret,
 46            bearer_token=bearer_token,
 47            public_api_root=public_api_root,
 48            config_api_root=config_api_root,
 49            workspace_id=workspace_id,
 50            organization_id=organization_id,
 51            env_vars=False,
 52        )
 53
 54    @property
 55    def client_id(self) -> SecretString | None:
 56        """OAuth client ID used for authentication."""
 57        return self._credentials.client_id
 58
 59    @property
 60    def client_secret(self) -> SecretString | None:
 61        """OAuth client secret used for authentication."""
 62        return self._credentials.client_secret
 63
 64    @property
 65    def bearer_token(self) -> SecretString | None:
 66        """Bearer token used for authentication."""
 67        return self._credentials.bearer_token
 68
 69    @property
 70    def public_api_root(self) -> str:
 71        """Airbyte Public API root."""
 72        return self._credentials.public_api_root
 73
 74    @property
 75    def config_api_root(self) -> str | None:
 76        """Airbyte Config API root."""
 77        return self._credentials.config_api_root
 78
 79    @property
 80    def organization_id(self) -> str | None:
 81        """Default organization ID for organization-scoped operations."""
 82        return self._credentials.organization_id
 83
 84    @classmethod
 85    def from_auth(
 86        cls,
 87        *,
 88        env_vars: bool = False,
 89        organization_id: str | None = None,
 90        client_id: str | SecretString | None = None,
 91        client_secret: str | SecretString | None = None,
 92        bearer_token: str | SecretString | None = None,
 93        public_api_root: str | None = None,
 94        config_api_root: str | None = None,
 95    ) -> CloudClient:
 96        """Create a client from explicit inputs and optionally environment variables.
 97
 98        When `env_vars` is True, environment variables are checked as a fallback
 99        after any explicitly provided values.
100        """
101        credentials = _AirbyteCredentials.from_auth(
102            organization_id=organization_id,
103            client_id=client_id,
104            client_secret=client_secret,
105            bearer_token=bearer_token,
106            public_api_root=public_api_root,
107            config_api_root=config_api_root,
108            env_vars=env_vars,
109        )
110        return cls._from_credentials(credentials)
111
112    @classmethod
113    def _from_credentials(cls, credentials: _AirbyteCredentials) -> CloudClient:
114        """Create a client from resolved Cloud credentials."""
115        return cls(
116            client_id=credentials.client_id,
117            client_secret=credentials.client_secret,
118            bearer_token=credentials.bearer_token,
119            public_api_root=credentials.public_api_root,
120            config_api_root=credentials.config_api_root,
121            workspace_id=credentials.workspace_id,
122            organization_id=credentials.organization_id,
123        )
124
125    def get_workspace(self, workspace_id: str | None = None) -> CloudWorkspace:
126        """Create a `CloudWorkspace` using this client's credentials."""
127        resolved_workspace_id = workspace_id or self._credentials.workspace_id
128        if not resolved_workspace_id:
129            raise exc.PyAirbyteInputError(
130                message="Workspace ID is required.",
131                guidance="Provide a workspace ID.",
132            )
133
134        credentials = self._credentials.with_workspace_id(resolved_workspace_id)
135        return CloudWorkspace(
136            workspace_id=credentials.workspace_id,
137            client_id=credentials.client_id,
138            client_secret=credentials.client_secret,
139            bearer_token=credentials.bearer_token,
140            api_root=credentials.public_api_root,
141            config_api_root=credentials.config_api_root,
142        )
143
144    def create_workspace(
145        self,
146        *,
147        name: str,
148        organization_id: str | None = None,
149        region_id: str | None = None,
150    ) -> CloudWorkspaceInfo:
151        """Create an Airbyte workspace."""
152        resolved_organization_id = organization_id or self.organization_id
153        workspace = api_util.create_workspace(
154            name=name,
155            organization_id=resolved_organization_id,
156            region_id=region_id,
157            api_root=self.public_api_root,
158            client_id=self.client_id,
159            client_secret=self.client_secret,
160            bearer_token=self.bearer_token,
161        )
162        return CloudWorkspaceInfo.from_api_response(workspace)
163
164    def rename_workspace(
165        self,
166        workspace_id: str,
167        *,
168        name: str,
169    ) -> CloudWorkspaceInfo:
170        """Rename an Airbyte workspace."""
171        workspace = api_util.rename_workspace(
172            workspace_id=workspace_id,
173            name=name,
174            api_root=self.public_api_root,
175            client_id=self.client_id,
176            client_secret=self.client_secret,
177            bearer_token=self.bearer_token,
178        )
179        return CloudWorkspaceInfo.from_api_response(workspace)
180
181    def permanently_delete_workspace(
182        self,
183        workspace_id: str,
184        *,
185        workspace_name: str | None = None,
186        safe_mode: bool = True,
187    ) -> None:
188        """Permanently delete an Airbyte workspace if it has no connections.
189
190        When `safe_mode` is enabled, the workspace name must contain `delete-me`
191        or `deleteme`. This also checks for existing connections before deleting
192        and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty.
193        """
194        api_util.permanently_delete_workspace(
195            workspace_id=workspace_id,
196            workspace_name=workspace_name,
197            api_root=self.public_api_root,
198            client_id=self.client_id,
199            client_secret=self.client_secret,
200            bearer_token=self.bearer_token,
201            safe_mode=safe_mode,
202        )
203
204    @overload
205    def list_workspaces(
206        self,
207        name: str | None = None,
208        *,
209        organization_id: None = None,
210        name_contains: str | None = None,
211        name_filter: Callable[[str], bool] | None = None,
212        limit: int | None = None,
213    ) -> list[CloudWorkspaceInfo]:
214        raise NotImplementedError
215
216    @overload
217    def list_workspaces(
218        self,
219        name: str | None = None,
220        *,
221        organization_id: str,
222        name_contains: str | None = None,
223        name_filter: Callable[[str], bool] | None = None,
224        limit: int | None = None,
225    ) -> list[CloudWorkspaceInfo]:
226        raise NotImplementedError
227
228    def list_workspaces(
229        self,
230        name: str | None = None,
231        *,
232        organization_id: str | None = None,
233        name_contains: str | None = None,
234        name_filter: Callable[[str], bool] | None = None,
235        limit: int | None = None,
236    ) -> list[CloudWorkspaceInfo]:
237        """List workspaces available to this client."""
238        if organization_id is not None or self.organization_id is not None:
239            resolved_organization_id = organization_id or self.organization_id
240            if not resolved_organization_id:
241                raise exc.PyAirbyteInputError(
242                    message="Organization ID is required.",
243                    guidance="Provide an organization ID.",
244                )
245            workspaces = api_util.list_workspaces_in_organization(
246                organization_id=resolved_organization_id,
247                api_root=self.public_api_root,
248                config_api_root=self.config_api_root,
249                client_id=self.client_id,
250                client_secret=self.client_secret,
251                bearer_token=self.bearer_token,
252                name_contains=name_contains or name,
253                limit=None if name_filter is not None else limit,
254            )
255            workspace_infos = [
256                CloudWorkspaceInfo.from_mapping(workspace) for workspace in workspaces
257            ]
258            if name_filter is not None:
259                workspace_infos = [
260                    workspace for workspace in workspace_infos if name_filter(workspace.name)
261                ]
262                if limit is not None:
263                    workspace_infos = workspace_infos[:limit]
264            return workspace_infos
265        if name_contains is not None:
266            if name_filter is not None:
267                raise exc.PyAirbyteInputError(
268                    message="You can provide name_contains or name_filter, but not both."
269                )
270            name_substring = name_contains
271
272            def name_filter(workspace_name: str) -> bool:
273                return name_substring in workspace_name
274
275        return [
276            CloudWorkspaceInfo.from_api_response(workspace)
277            for workspace in api_util.list_workspaces(
278                workspace_id="",
279                api_root=self.public_api_root,
280                name=name,
281                name_filter=name_filter,
282                client_id=self.client_id,
283                client_secret=self.client_secret,
284                bearer_token=self.bearer_token,
285                limit=limit,
286            )
287        ]
288
289    def get_organization(
290        self,
291        organization_id: str | None = None,
292        *,
293        organization_name: str | None = None,
294    ) -> CloudOrganization:
295        """Resolve an organization by ID or exact name."""
296        resolved_organization_id = organization_id or self.organization_id
297        if resolved_organization_id and organization_name:
298            raise exc.PyAirbyteInputError(
299                message="Provide either organization ID or organization name."
300            )
301        if not resolved_organization_id and not organization_name:
302            raise exc.PyAirbyteInputError(
303                message="Organization ID or organization name is required."
304            )
305
306        organizations = api_util.list_organizations_for_user(
307            api_root=self.public_api_root,
308            client_id=self.client_id,
309            client_secret=self.client_secret,
310            bearer_token=self.bearer_token,
311        )
312        if resolved_organization_id:
313            matching_organizations = [
314                organization
315                for organization in organizations
316                if organization.organization_id == resolved_organization_id
317            ]
318        else:
319            matching_organizations = [
320                organization
321                for organization in organizations
322                if organization.organization_name == organization_name
323            ]
324
325        if not matching_organizations:
326            raise AirbyteMissingResourceError(
327                resource_type="organization",
328                resource_name_or_id=resolved_organization_id or organization_name,
329            )
330        if len(matching_organizations) > 1:
331            raise exc.PyAirbyteInputError(
332                message="Organization name matches multiple organizations.",
333                context={"organization_name": organization_name},
334            )
335
336        organization = matching_organizations[0]
337
338        organization_credentials = self._credentials.with_organization_id(
339            organization.organization_id
340        )
341        return CloudOrganization(
342            organization_id=organization.organization_id,
343            organization_name=organization.organization_name,
344            email=organization.email,
345            client_id=organization_credentials.client_id,
346            client_secret=organization_credentials.client_secret,
347            bearer_token=organization_credentials.bearer_token,
348            public_api_root=organization_credentials.public_api_root,
349            config_api_root=organization_credentials.config_api_root,
350        )

Authenticated client for Airbyte Cloud and self-managed Airbyte APIs.

CloudClient( *, client_id: str | airbyte.secrets.SecretString | None = None, client_secret: str | airbyte.secrets.SecretString | None = None, bearer_token: str | airbyte.secrets.SecretString | None = None, public_api_root: str | None = None, config_api_root: str | None = None, workspace_id: str | None = None, organization_id: str | None = None)
31    def __init__(
32        self,
33        *,
34        client_id: str | SecretString | None = None,
35        client_secret: str | SecretString | None = None,
36        bearer_token: str | SecretString | None = None,
37        public_api_root: str | None = None,
38        config_api_root: str | None = None,
39        workspace_id: str | None = None,
40        organization_id: str | None = None,
41    ) -> None:
42        """Initialize a `CloudClient` from explicit auth values."""
43        self._credentials = _AirbyteCredentials.from_auth(
44            client_id=client_id,
45            client_secret=client_secret,
46            bearer_token=bearer_token,
47            public_api_root=public_api_root,
48            config_api_root=config_api_root,
49            workspace_id=workspace_id,
50            organization_id=organization_id,
51            env_vars=False,
52        )

Initialize a CloudClient from explicit auth values.

client_id: airbyte.secrets.SecretString | None
54    @property
55    def client_id(self) -> SecretString | None:
56        """OAuth client ID used for authentication."""
57        return self._credentials.client_id

OAuth client ID used for authentication.

client_secret: airbyte.secrets.SecretString | None
59    @property
60    def client_secret(self) -> SecretString | None:
61        """OAuth client secret used for authentication."""
62        return self._credentials.client_secret

OAuth client secret used for authentication.

bearer_token: airbyte.secrets.SecretString | None
64    @property
65    def bearer_token(self) -> SecretString | None:
66        """Bearer token used for authentication."""
67        return self._credentials.bearer_token

Bearer token used for authentication.

public_api_root: str
69    @property
70    def public_api_root(self) -> str:
71        """Airbyte Public API root."""
72        return self._credentials.public_api_root

Airbyte Public API root.

config_api_root: str | None
74    @property
75    def config_api_root(self) -> str | None:
76        """Airbyte Config API root."""
77        return self._credentials.config_api_root

Airbyte Config API root.

organization_id: str | None
79    @property
80    def organization_id(self) -> str | None:
81        """Default organization ID for organization-scoped operations."""
82        return self._credentials.organization_id

Default organization ID for organization-scoped operations.

@classmethod
def from_auth( cls, *, env_vars: bool = False, organization_id: str | None = None, client_id: str | airbyte.secrets.SecretString | None = None, client_secret: str | airbyte.secrets.SecretString | None = None, bearer_token: str | airbyte.secrets.SecretString | None = None, public_api_root: str | None = None, config_api_root: str | None = None) -> CloudClient:
 84    @classmethod
 85    def from_auth(
 86        cls,
 87        *,
 88        env_vars: bool = False,
 89        organization_id: str | None = None,
 90        client_id: str | SecretString | None = None,
 91        client_secret: str | SecretString | None = None,
 92        bearer_token: str | SecretString | None = None,
 93        public_api_root: str | None = None,
 94        config_api_root: str | None = None,
 95    ) -> CloudClient:
 96        """Create a client from explicit inputs and optionally environment variables.
 97
 98        When `env_vars` is True, environment variables are checked as a fallback
 99        after any explicitly provided values.
100        """
101        credentials = _AirbyteCredentials.from_auth(
102            organization_id=organization_id,
103            client_id=client_id,
104            client_secret=client_secret,
105            bearer_token=bearer_token,
106            public_api_root=public_api_root,
107            config_api_root=config_api_root,
108            env_vars=env_vars,
109        )
110        return cls._from_credentials(credentials)

Create a client from explicit inputs and optionally environment variables.

When env_vars is True, environment variables are checked as a fallback after any explicitly provided values.

def get_workspace( self, workspace_id: str | None = None) -> airbyte.cloud.CloudWorkspace:
125    def get_workspace(self, workspace_id: str | None = None) -> CloudWorkspace:
126        """Create a `CloudWorkspace` using this client's credentials."""
127        resolved_workspace_id = workspace_id or self._credentials.workspace_id
128        if not resolved_workspace_id:
129            raise exc.PyAirbyteInputError(
130                message="Workspace ID is required.",
131                guidance="Provide a workspace ID.",
132            )
133
134        credentials = self._credentials.with_workspace_id(resolved_workspace_id)
135        return CloudWorkspace(
136            workspace_id=credentials.workspace_id,
137            client_id=credentials.client_id,
138            client_secret=credentials.client_secret,
139            bearer_token=credentials.bearer_token,
140            api_root=credentials.public_api_root,
141            config_api_root=credentials.config_api_root,
142        )

Create a CloudWorkspace using this client's credentials.

def create_workspace( self, *, name: str, organization_id: str | None = None, region_id: str | None = None) -> airbyte.cloud.CloudWorkspaceInfo:
144    def create_workspace(
145        self,
146        *,
147        name: str,
148        organization_id: str | None = None,
149        region_id: str | None = None,
150    ) -> CloudWorkspaceInfo:
151        """Create an Airbyte workspace."""
152        resolved_organization_id = organization_id or self.organization_id
153        workspace = api_util.create_workspace(
154            name=name,
155            organization_id=resolved_organization_id,
156            region_id=region_id,
157            api_root=self.public_api_root,
158            client_id=self.client_id,
159            client_secret=self.client_secret,
160            bearer_token=self.bearer_token,
161        )
162        return CloudWorkspaceInfo.from_api_response(workspace)

Create an Airbyte workspace.

def rename_workspace( self, workspace_id: str, *, name: str) -> airbyte.cloud.CloudWorkspaceInfo:
164    def rename_workspace(
165        self,
166        workspace_id: str,
167        *,
168        name: str,
169    ) -> CloudWorkspaceInfo:
170        """Rename an Airbyte workspace."""
171        workspace = api_util.rename_workspace(
172            workspace_id=workspace_id,
173            name=name,
174            api_root=self.public_api_root,
175            client_id=self.client_id,
176            client_secret=self.client_secret,
177            bearer_token=self.bearer_token,
178        )
179        return CloudWorkspaceInfo.from_api_response(workspace)

Rename an Airbyte workspace.

def permanently_delete_workspace( self, workspace_id: str, *, workspace_name: str | None = None, safe_mode: bool = True) -> None:
181    def permanently_delete_workspace(
182        self,
183        workspace_id: str,
184        *,
185        workspace_name: str | None = None,
186        safe_mode: bool = True,
187    ) -> None:
188        """Permanently delete an Airbyte workspace if it has no connections.
189
190        When `safe_mode` is enabled, the workspace name must contain `delete-me`
191        or `deleteme`. This also checks for existing connections before deleting
192        and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty.
193        """
194        api_util.permanently_delete_workspace(
195            workspace_id=workspace_id,
196            workspace_name=workspace_name,
197            api_root=self.public_api_root,
198            client_id=self.client_id,
199            client_secret=self.client_secret,
200            bearer_token=self.bearer_token,
201            safe_mode=safe_mode,
202        )

Permanently delete an Airbyte workspace if it has no connections.

When safe_mode is enabled, the workspace name must contain delete-me or deleteme. This also checks for existing connections before deleting and raises AirbyteWorkspaceNotEmptyError if the workspace is not empty.

def list_workspaces( self, name: str | None = None, *, organization_id: str | None = None, name_contains: str | None = None, name_filter: Callable[[str], bool] | None = None, limit: int | None = None) -> list[airbyte.cloud.CloudWorkspaceInfo]:
228    def list_workspaces(
229        self,
230        name: str | None = None,
231        *,
232        organization_id: str | None = None,
233        name_contains: str | None = None,
234        name_filter: Callable[[str], bool] | None = None,
235        limit: int | None = None,
236    ) -> list[CloudWorkspaceInfo]:
237        """List workspaces available to this client."""
238        if organization_id is not None or self.organization_id is not None:
239            resolved_organization_id = organization_id or self.organization_id
240            if not resolved_organization_id:
241                raise exc.PyAirbyteInputError(
242                    message="Organization ID is required.",
243                    guidance="Provide an organization ID.",
244                )
245            workspaces = api_util.list_workspaces_in_organization(
246                organization_id=resolved_organization_id,
247                api_root=self.public_api_root,
248                config_api_root=self.config_api_root,
249                client_id=self.client_id,
250                client_secret=self.client_secret,
251                bearer_token=self.bearer_token,
252                name_contains=name_contains or name,
253                limit=None if name_filter is not None else limit,
254            )
255            workspace_infos = [
256                CloudWorkspaceInfo.from_mapping(workspace) for workspace in workspaces
257            ]
258            if name_filter is not None:
259                workspace_infos = [
260                    workspace for workspace in workspace_infos if name_filter(workspace.name)
261                ]
262                if limit is not None:
263                    workspace_infos = workspace_infos[:limit]
264            return workspace_infos
265        if name_contains is not None:
266            if name_filter is not None:
267                raise exc.PyAirbyteInputError(
268                    message="You can provide name_contains or name_filter, but not both."
269                )
270            name_substring = name_contains
271
272            def name_filter(workspace_name: str) -> bool:
273                return name_substring in workspace_name
274
275        return [
276            CloudWorkspaceInfo.from_api_response(workspace)
277            for workspace in api_util.list_workspaces(
278                workspace_id="",
279                api_root=self.public_api_root,
280                name=name,
281                name_filter=name_filter,
282                client_id=self.client_id,
283                client_secret=self.client_secret,
284                bearer_token=self.bearer_token,
285                limit=limit,
286            )
287        ]

List workspaces available to this client.

def get_organization( self, organization_id: str | None = None, *, organization_name: str | None = None) -> airbyte.cloud.CloudOrganization:
289    def get_organization(
290        self,
291        organization_id: str | None = None,
292        *,
293        organization_name: str | None = None,
294    ) -> CloudOrganization:
295        """Resolve an organization by ID or exact name."""
296        resolved_organization_id = organization_id or self.organization_id
297        if resolved_organization_id and organization_name:
298            raise exc.PyAirbyteInputError(
299                message="Provide either organization ID or organization name."
300            )
301        if not resolved_organization_id and not organization_name:
302            raise exc.PyAirbyteInputError(
303                message="Organization ID or organization name is required."
304            )
305
306        organizations = api_util.list_organizations_for_user(
307            api_root=self.public_api_root,
308            client_id=self.client_id,
309            client_secret=self.client_secret,
310            bearer_token=self.bearer_token,
311        )
312        if resolved_organization_id:
313            matching_organizations = [
314                organization
315                for organization in organizations
316                if organization.organization_id == resolved_organization_id
317            ]
318        else:
319            matching_organizations = [
320                organization
321                for organization in organizations
322                if organization.organization_name == organization_name
323            ]
324
325        if not matching_organizations:
326            raise AirbyteMissingResourceError(
327                resource_type="organization",
328                resource_name_or_id=resolved_organization_id or organization_name,
329            )
330        if len(matching_organizations) > 1:
331            raise exc.PyAirbyteInputError(
332                message="Organization name matches multiple organizations.",
333                context={"organization_name": organization_name},
334            )
335
336        organization = matching_organizations[0]
337
338        organization_credentials = self._credentials.with_organization_id(
339            organization.organization_id
340        )
341        return CloudOrganization(
342            organization_id=organization.organization_id,
343            organization_name=organization.organization_name,
344            email=organization.email,
345            client_id=organization_credentials.client_id,
346            client_secret=organization_credentials.client_secret,
347            bearer_token=organization_credentials.bearer_token,
348            public_api_root=organization_credentials.public_api_root,
349            config_api_root=organization_credentials.config_api_root,
350        )

Resolve an organization by ID or exact name.