airbyte.cloud.client
PyAirbyte Cloud client.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte Cloud client.""" 3 4from __future__ import annotations 5 6from dataclasses import dataclass 7from typing import TYPE_CHECKING, overload 8 9from airbyte import exceptions as exc 10from airbyte._util import api_util 11from airbyte.cloud._credentials import _AirbyteCredentials 12from airbyte.cloud.models import CloudWorkspaceInfo 13from airbyte.cloud.organizations import CloudOrganization 14from airbyte.cloud.workspaces import CloudWorkspace 15from airbyte.exceptions import AirbyteMissingResourceError 16 17 18if TYPE_CHECKING: 19 from collections.abc import Callable 20 21 from airbyte.secrets.base import SecretString 22 23 24@dataclass(init=False, kw_only=True) 25class CloudClient: 26 """Authenticated client for Airbyte Cloud and self-managed Airbyte APIs.""" 27 28 _credentials: _AirbyteCredentials 29 30 def __init__( 31 self, 32 *, 33 client_id: str | SecretString | None = None, 34 client_secret: str | SecretString | None = None, 35 bearer_token: str | SecretString | None = None, 36 public_api_root: str | None = None, 37 config_api_root: str | None = None, 38 workspace_id: str | None = None, 39 organization_id: str | None = None, 40 ) -> None: 41 """Initialize a `CloudClient` from explicit auth values.""" 42 self._credentials = _AirbyteCredentials.from_auth( 43 client_id=client_id, 44 client_secret=client_secret, 45 bearer_token=bearer_token, 46 public_api_root=public_api_root, 47 config_api_root=config_api_root, 48 workspace_id=workspace_id, 49 organization_id=organization_id, 50 env_vars=False, 51 ) 52 53 @property 54 def client_id(self) -> SecretString | None: 55 """OAuth client ID used for authentication.""" 56 return self._credentials.client_id 57 58 @property 59 def client_secret(self) -> SecretString | None: 60 """OAuth client secret used for authentication.""" 61 return self._credentials.client_secret 62 63 @property 64 def bearer_token(self) -> SecretString | None: 65 """Bearer token used for authentication.""" 66 return self._credentials.bearer_token 67 68 @property 69 def public_api_root(self) -> str: 70 """Airbyte Public API root.""" 71 return self._credentials.public_api_root 72 73 @property 74 def config_api_root(self) -> str | None: 75 """Airbyte Config API root.""" 76 return self._credentials.config_api_root 77 78 @property 79 def organization_id(self) -> str | None: 80 """Default organization ID for organization-scoped operations.""" 81 return self._credentials.organization_id 82 83 @classmethod 84 def from_auth( 85 cls, 86 *, 87 env_vars: bool = False, 88 organization_id: str | None = None, 89 client_id: str | SecretString | None = None, 90 client_secret: str | SecretString | None = None, 91 bearer_token: str | SecretString | None = None, 92 public_api_root: str | None = None, 93 config_api_root: str | None = None, 94 ) -> CloudClient: 95 """Create a client from explicit inputs and optionally environment variables. 96 97 When `env_vars` is True, environment variables are checked as a fallback 98 after any explicitly provided values. 99 """ 100 credentials = _AirbyteCredentials.from_auth( 101 organization_id=organization_id, 102 client_id=client_id, 103 client_secret=client_secret, 104 bearer_token=bearer_token, 105 public_api_root=public_api_root, 106 config_api_root=config_api_root, 107 env_vars=env_vars, 108 ) 109 return cls._from_credentials(credentials) 110 111 @classmethod 112 def _from_credentials(cls, credentials: _AirbyteCredentials) -> CloudClient: 113 """Create a client from resolved Cloud credentials.""" 114 return cls( 115 client_id=credentials.client_id, 116 client_secret=credentials.client_secret, 117 bearer_token=credentials.bearer_token, 118 public_api_root=credentials.public_api_root, 119 config_api_root=credentials.config_api_root, 120 workspace_id=credentials.workspace_id, 121 organization_id=credentials.organization_id, 122 ) 123 124 def get_workspace(self, workspace_id: str | None = None) -> CloudWorkspace: 125 """Create a `CloudWorkspace` using this client's credentials.""" 126 resolved_workspace_id = workspace_id or self._credentials.workspace_id 127 if not resolved_workspace_id: 128 raise exc.PyAirbyteInputError( 129 message="Workspace ID is required.", 130 guidance="Provide a workspace ID.", 131 ) 132 133 credentials = self._credentials.with_workspace_id(resolved_workspace_id) 134 return CloudWorkspace( 135 workspace_id=credentials.workspace_id, 136 client_id=credentials.client_id, 137 client_secret=credentials.client_secret, 138 bearer_token=credentials.bearer_token, 139 api_root=credentials.public_api_root, 140 config_api_root=credentials.config_api_root, 141 ) 142 143 def create_workspace( 144 self, 145 *, 146 name: str, 147 organization_id: str | None = None, 148 region_id: str | None = None, 149 ) -> CloudWorkspaceInfo: 150 """Create an Airbyte workspace.""" 151 resolved_organization_id = organization_id or self.organization_id 152 workspace = api_util.create_workspace( 153 name=name, 154 organization_id=resolved_organization_id, 155 region_id=region_id, 156 api_root=self.public_api_root, 157 client_id=self.client_id, 158 client_secret=self.client_secret, 159 bearer_token=self.bearer_token, 160 ) 161 return CloudWorkspaceInfo.from_api_response(workspace) 162 163 def rename_workspace( 164 self, 165 workspace_id: str, 166 *, 167 name: str, 168 ) -> CloudWorkspaceInfo: 169 """Rename an Airbyte workspace.""" 170 workspace = api_util.rename_workspace( 171 workspace_id=workspace_id, 172 name=name, 173 api_root=self.public_api_root, 174 client_id=self.client_id, 175 client_secret=self.client_secret, 176 bearer_token=self.bearer_token, 177 ) 178 return CloudWorkspaceInfo.from_api_response(workspace) 179 180 def permanently_delete_workspace( 181 self, 182 workspace_id: str, 183 *, 184 workspace_name: str | None = None, 185 safe_mode: bool = True, 186 ) -> None: 187 """Permanently delete an Airbyte workspace if it has no connections. 188 189 When `safe_mode` is enabled, the workspace name must contain `delete-me` 190 or `deleteme`. This also checks for existing connections before deleting 191 and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty. 192 """ 193 api_util.permanently_delete_workspace( 194 workspace_id=workspace_id, 195 workspace_name=workspace_name, 196 api_root=self.public_api_root, 197 client_id=self.client_id, 198 client_secret=self.client_secret, 199 bearer_token=self.bearer_token, 200 safe_mode=safe_mode, 201 ) 202 203 @overload 204 def list_workspaces( 205 self, 206 name: str | None = None, 207 *, 208 organization_id: None = None, 209 name_contains: str | None = None, 210 name_filter: Callable[[str], bool] | None = None, 211 limit: int | None = None, 212 ) -> list[CloudWorkspaceInfo]: 213 raise NotImplementedError 214 215 @overload 216 def list_workspaces( 217 self, 218 name: str | None = None, 219 *, 220 organization_id: str, 221 name_contains: str | None = None, 222 name_filter: Callable[[str], bool] | None = None, 223 limit: int | None = None, 224 ) -> list[CloudWorkspaceInfo]: 225 raise NotImplementedError 226 227 def list_workspaces( 228 self, 229 name: str | None = None, 230 *, 231 organization_id: str | None = None, 232 name_contains: str | None = None, 233 name_filter: Callable[[str], bool] | None = None, 234 limit: int | None = None, 235 ) -> list[CloudWorkspaceInfo]: 236 """List workspaces available to this client.""" 237 if organization_id is not None or self.organization_id is not None: 238 resolved_organization_id = organization_id or self.organization_id 239 if not resolved_organization_id: 240 raise exc.PyAirbyteInputError( 241 message="Organization ID is required.", 242 guidance="Provide an organization ID.", 243 ) 244 workspaces = api_util.list_workspaces_in_organization( 245 organization_id=resolved_organization_id, 246 api_root=self.public_api_root, 247 config_api_root=self.config_api_root, 248 client_id=self.client_id, 249 client_secret=self.client_secret, 250 bearer_token=self.bearer_token, 251 name_contains=name_contains or name, 252 limit=None if name_filter is not None else limit, 253 ) 254 workspace_infos = [ 255 CloudWorkspaceInfo.from_mapping(workspace) for workspace in workspaces 256 ] 257 if name_filter is not None: 258 workspace_infos = [ 259 workspace for workspace in workspace_infos if name_filter(workspace.name) 260 ] 261 if limit is not None: 262 workspace_infos = workspace_infos[:limit] 263 return workspace_infos 264 if name_contains is not None: 265 if name_filter is not None: 266 raise exc.PyAirbyteInputError( 267 message="You can provide name_contains or name_filter, but not both." 268 ) 269 name_substring = name_contains 270 271 def name_filter(workspace_name: str) -> bool: 272 return name_substring in workspace_name 273 274 return [ 275 CloudWorkspaceInfo.from_api_response(workspace) 276 for workspace in api_util.list_workspaces( 277 workspace_id="", 278 api_root=self.public_api_root, 279 name=name, 280 name_filter=name_filter, 281 client_id=self.client_id, 282 client_secret=self.client_secret, 283 bearer_token=self.bearer_token, 284 limit=limit, 285 ) 286 ] 287 288 def get_organization( 289 self, 290 organization_id: str | None = None, 291 *, 292 organization_name: str | None = None, 293 ) -> CloudOrganization: 294 """Resolve an organization by ID or exact name.""" 295 resolved_organization_id = organization_id or self.organization_id 296 if resolved_organization_id and organization_name: 297 raise exc.PyAirbyteInputError( 298 message="Provide either organization ID or organization name." 299 ) 300 if not resolved_organization_id and not organization_name: 301 raise exc.PyAirbyteInputError( 302 message="Organization ID or organization name is required." 303 ) 304 305 organizations = api_util.list_organizations_for_user( 306 api_root=self.public_api_root, 307 client_id=self.client_id, 308 client_secret=self.client_secret, 309 bearer_token=self.bearer_token, 310 ) 311 if resolved_organization_id: 312 matching_organizations = [ 313 organization 314 for organization in organizations 315 if organization.organization_id == resolved_organization_id 316 ] 317 else: 318 matching_organizations = [ 319 organization 320 for organization in organizations 321 if organization.organization_name == organization_name 322 ] 323 324 if not matching_organizations: 325 raise AirbyteMissingResourceError( 326 resource_type="organization", 327 resource_name_or_id=resolved_organization_id or organization_name, 328 ) 329 if len(matching_organizations) > 1: 330 raise exc.PyAirbyteInputError( 331 message="Organization name matches multiple organizations.", 332 context={"organization_name": organization_name}, 333 ) 334 335 organization = matching_organizations[0] 336 337 organization_credentials = self._credentials.with_organization_id( 338 organization.organization_id 339 ) 340 return CloudOrganization( 341 organization_id=organization.organization_id, 342 organization_name=organization.organization_name, 343 email=organization.email, 344 client_id=organization_credentials.client_id, 345 client_secret=organization_credentials.client_secret, 346 bearer_token=organization_credentials.bearer_token, 347 public_api_root=organization_credentials.public_api_root, 348 config_api_root=organization_credentials.config_api_root, 349 )
25@dataclass(init=False, kw_only=True) 26class CloudClient: 27 """Authenticated client for Airbyte Cloud and self-managed Airbyte APIs.""" 28 29 _credentials: _AirbyteCredentials 30 31 def __init__( 32 self, 33 *, 34 client_id: str | SecretString | None = None, 35 client_secret: str | SecretString | None = None, 36 bearer_token: str | SecretString | None = None, 37 public_api_root: str | None = None, 38 config_api_root: str | None = None, 39 workspace_id: str | None = None, 40 organization_id: str | None = None, 41 ) -> None: 42 """Initialize a `CloudClient` from explicit auth values.""" 43 self._credentials = _AirbyteCredentials.from_auth( 44 client_id=client_id, 45 client_secret=client_secret, 46 bearer_token=bearer_token, 47 public_api_root=public_api_root, 48 config_api_root=config_api_root, 49 workspace_id=workspace_id, 50 organization_id=organization_id, 51 env_vars=False, 52 ) 53 54 @property 55 def client_id(self) -> SecretString | None: 56 """OAuth client ID used for authentication.""" 57 return self._credentials.client_id 58 59 @property 60 def client_secret(self) -> SecretString | None: 61 """OAuth client secret used for authentication.""" 62 return self._credentials.client_secret 63 64 @property 65 def bearer_token(self) -> SecretString | None: 66 """Bearer token used for authentication.""" 67 return self._credentials.bearer_token 68 69 @property 70 def public_api_root(self) -> str: 71 """Airbyte Public API root.""" 72 return self._credentials.public_api_root 73 74 @property 75 def config_api_root(self) -> str | None: 76 """Airbyte Config API root.""" 77 return self._credentials.config_api_root 78 79 @property 80 def organization_id(self) -> str | None: 81 """Default organization ID for organization-scoped operations.""" 82 return self._credentials.organization_id 83 84 @classmethod 85 def from_auth( 86 cls, 87 *, 88 env_vars: bool = False, 89 organization_id: str | None = None, 90 client_id: str | SecretString | None = None, 91 client_secret: str | SecretString | None = None, 92 bearer_token: str | SecretString | None = None, 93 public_api_root: str | None = None, 94 config_api_root: str | None = None, 95 ) -> CloudClient: 96 """Create a client from explicit inputs and optionally environment variables. 97 98 When `env_vars` is True, environment variables are checked as a fallback 99 after any explicitly provided values. 100 """ 101 credentials = _AirbyteCredentials.from_auth( 102 organization_id=organization_id, 103 client_id=client_id, 104 client_secret=client_secret, 105 bearer_token=bearer_token, 106 public_api_root=public_api_root, 107 config_api_root=config_api_root, 108 env_vars=env_vars, 109 ) 110 return cls._from_credentials(credentials) 111 112 @classmethod 113 def _from_credentials(cls, credentials: _AirbyteCredentials) -> CloudClient: 114 """Create a client from resolved Cloud credentials.""" 115 return cls( 116 client_id=credentials.client_id, 117 client_secret=credentials.client_secret, 118 bearer_token=credentials.bearer_token, 119 public_api_root=credentials.public_api_root, 120 config_api_root=credentials.config_api_root, 121 workspace_id=credentials.workspace_id, 122 organization_id=credentials.organization_id, 123 ) 124 125 def get_workspace(self, workspace_id: str | None = None) -> CloudWorkspace: 126 """Create a `CloudWorkspace` using this client's credentials.""" 127 resolved_workspace_id = workspace_id or self._credentials.workspace_id 128 if not resolved_workspace_id: 129 raise exc.PyAirbyteInputError( 130 message="Workspace ID is required.", 131 guidance="Provide a workspace ID.", 132 ) 133 134 credentials = self._credentials.with_workspace_id(resolved_workspace_id) 135 return CloudWorkspace( 136 workspace_id=credentials.workspace_id, 137 client_id=credentials.client_id, 138 client_secret=credentials.client_secret, 139 bearer_token=credentials.bearer_token, 140 api_root=credentials.public_api_root, 141 config_api_root=credentials.config_api_root, 142 ) 143 144 def create_workspace( 145 self, 146 *, 147 name: str, 148 organization_id: str | None = None, 149 region_id: str | None = None, 150 ) -> CloudWorkspaceInfo: 151 """Create an Airbyte workspace.""" 152 resolved_organization_id = organization_id or self.organization_id 153 workspace = api_util.create_workspace( 154 name=name, 155 organization_id=resolved_organization_id, 156 region_id=region_id, 157 api_root=self.public_api_root, 158 client_id=self.client_id, 159 client_secret=self.client_secret, 160 bearer_token=self.bearer_token, 161 ) 162 return CloudWorkspaceInfo.from_api_response(workspace) 163 164 def rename_workspace( 165 self, 166 workspace_id: str, 167 *, 168 name: str, 169 ) -> CloudWorkspaceInfo: 170 """Rename an Airbyte workspace.""" 171 workspace = api_util.rename_workspace( 172 workspace_id=workspace_id, 173 name=name, 174 api_root=self.public_api_root, 175 client_id=self.client_id, 176 client_secret=self.client_secret, 177 bearer_token=self.bearer_token, 178 ) 179 return CloudWorkspaceInfo.from_api_response(workspace) 180 181 def permanently_delete_workspace( 182 self, 183 workspace_id: str, 184 *, 185 workspace_name: str | None = None, 186 safe_mode: bool = True, 187 ) -> None: 188 """Permanently delete an Airbyte workspace if it has no connections. 189 190 When `safe_mode` is enabled, the workspace name must contain `delete-me` 191 or `deleteme`. This also checks for existing connections before deleting 192 and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty. 193 """ 194 api_util.permanently_delete_workspace( 195 workspace_id=workspace_id, 196 workspace_name=workspace_name, 197 api_root=self.public_api_root, 198 client_id=self.client_id, 199 client_secret=self.client_secret, 200 bearer_token=self.bearer_token, 201 safe_mode=safe_mode, 202 ) 203 204 @overload 205 def list_workspaces( 206 self, 207 name: str | None = None, 208 *, 209 organization_id: None = None, 210 name_contains: str | None = None, 211 name_filter: Callable[[str], bool] | None = None, 212 limit: int | None = None, 213 ) -> list[CloudWorkspaceInfo]: 214 raise NotImplementedError 215 216 @overload 217 def list_workspaces( 218 self, 219 name: str | None = None, 220 *, 221 organization_id: str, 222 name_contains: str | None = None, 223 name_filter: Callable[[str], bool] | None = None, 224 limit: int | None = None, 225 ) -> list[CloudWorkspaceInfo]: 226 raise NotImplementedError 227 228 def list_workspaces( 229 self, 230 name: str | None = None, 231 *, 232 organization_id: str | None = None, 233 name_contains: str | None = None, 234 name_filter: Callable[[str], bool] | None = None, 235 limit: int | None = None, 236 ) -> list[CloudWorkspaceInfo]: 237 """List workspaces available to this client.""" 238 if organization_id is not None or self.organization_id is not None: 239 resolved_organization_id = organization_id or self.organization_id 240 if not resolved_organization_id: 241 raise exc.PyAirbyteInputError( 242 message="Organization ID is required.", 243 guidance="Provide an organization ID.", 244 ) 245 workspaces = api_util.list_workspaces_in_organization( 246 organization_id=resolved_organization_id, 247 api_root=self.public_api_root, 248 config_api_root=self.config_api_root, 249 client_id=self.client_id, 250 client_secret=self.client_secret, 251 bearer_token=self.bearer_token, 252 name_contains=name_contains or name, 253 limit=None if name_filter is not None else limit, 254 ) 255 workspace_infos = [ 256 CloudWorkspaceInfo.from_mapping(workspace) for workspace in workspaces 257 ] 258 if name_filter is not None: 259 workspace_infos = [ 260 workspace for workspace in workspace_infos if name_filter(workspace.name) 261 ] 262 if limit is not None: 263 workspace_infos = workspace_infos[:limit] 264 return workspace_infos 265 if name_contains is not None: 266 if name_filter is not None: 267 raise exc.PyAirbyteInputError( 268 message="You can provide name_contains or name_filter, but not both." 269 ) 270 name_substring = name_contains 271 272 def name_filter(workspace_name: str) -> bool: 273 return name_substring in workspace_name 274 275 return [ 276 CloudWorkspaceInfo.from_api_response(workspace) 277 for workspace in api_util.list_workspaces( 278 workspace_id="", 279 api_root=self.public_api_root, 280 name=name, 281 name_filter=name_filter, 282 client_id=self.client_id, 283 client_secret=self.client_secret, 284 bearer_token=self.bearer_token, 285 limit=limit, 286 ) 287 ] 288 289 def get_organization( 290 self, 291 organization_id: str | None = None, 292 *, 293 organization_name: str | None = None, 294 ) -> CloudOrganization: 295 """Resolve an organization by ID or exact name.""" 296 resolved_organization_id = organization_id or self.organization_id 297 if resolved_organization_id and organization_name: 298 raise exc.PyAirbyteInputError( 299 message="Provide either organization ID or organization name." 300 ) 301 if not resolved_organization_id and not organization_name: 302 raise exc.PyAirbyteInputError( 303 message="Organization ID or organization name is required." 304 ) 305 306 organizations = api_util.list_organizations_for_user( 307 api_root=self.public_api_root, 308 client_id=self.client_id, 309 client_secret=self.client_secret, 310 bearer_token=self.bearer_token, 311 ) 312 if resolved_organization_id: 313 matching_organizations = [ 314 organization 315 for organization in organizations 316 if organization.organization_id == resolved_organization_id 317 ] 318 else: 319 matching_organizations = [ 320 organization 321 for organization in organizations 322 if organization.organization_name == organization_name 323 ] 324 325 if not matching_organizations: 326 raise AirbyteMissingResourceError( 327 resource_type="organization", 328 resource_name_or_id=resolved_organization_id or organization_name, 329 ) 330 if len(matching_organizations) > 1: 331 raise exc.PyAirbyteInputError( 332 message="Organization name matches multiple organizations.", 333 context={"organization_name": organization_name}, 334 ) 335 336 organization = matching_organizations[0] 337 338 organization_credentials = self._credentials.with_organization_id( 339 organization.organization_id 340 ) 341 return CloudOrganization( 342 organization_id=organization.organization_id, 343 organization_name=organization.organization_name, 344 email=organization.email, 345 client_id=organization_credentials.client_id, 346 client_secret=organization_credentials.client_secret, 347 bearer_token=organization_credentials.bearer_token, 348 public_api_root=organization_credentials.public_api_root, 349 config_api_root=organization_credentials.config_api_root, 350 )
Authenticated client for Airbyte Cloud and self-managed Airbyte APIs.
31 def __init__( 32 self, 33 *, 34 client_id: str | SecretString | None = None, 35 client_secret: str | SecretString | None = None, 36 bearer_token: str | SecretString | None = None, 37 public_api_root: str | None = None, 38 config_api_root: str | None = None, 39 workspace_id: str | None = None, 40 organization_id: str | None = None, 41 ) -> None: 42 """Initialize a `CloudClient` from explicit auth values.""" 43 self._credentials = _AirbyteCredentials.from_auth( 44 client_id=client_id, 45 client_secret=client_secret, 46 bearer_token=bearer_token, 47 public_api_root=public_api_root, 48 config_api_root=config_api_root, 49 workspace_id=workspace_id, 50 organization_id=organization_id, 51 env_vars=False, 52 )
Initialize a CloudClient from explicit auth values.
54 @property 55 def client_id(self) -> SecretString | None: 56 """OAuth client ID used for authentication.""" 57 return self._credentials.client_id
OAuth client ID used for authentication.
59 @property 60 def client_secret(self) -> SecretString | None: 61 """OAuth client secret used for authentication.""" 62 return self._credentials.client_secret
OAuth client secret used for authentication.
64 @property 65 def bearer_token(self) -> SecretString | None: 66 """Bearer token used for authentication.""" 67 return self._credentials.bearer_token
Bearer token used for authentication.
69 @property 70 def public_api_root(self) -> str: 71 """Airbyte Public API root.""" 72 return self._credentials.public_api_root
Airbyte Public API root.
74 @property 75 def config_api_root(self) -> str | None: 76 """Airbyte Config API root.""" 77 return self._credentials.config_api_root
Airbyte Config API root.
79 @property 80 def organization_id(self) -> str | None: 81 """Default organization ID for organization-scoped operations.""" 82 return self._credentials.organization_id
Default organization ID for organization-scoped operations.
84 @classmethod 85 def from_auth( 86 cls, 87 *, 88 env_vars: bool = False, 89 organization_id: str | None = None, 90 client_id: str | SecretString | None = None, 91 client_secret: str | SecretString | None = None, 92 bearer_token: str | SecretString | None = None, 93 public_api_root: str | None = None, 94 config_api_root: str | None = None, 95 ) -> CloudClient: 96 """Create a client from explicit inputs and optionally environment variables. 97 98 When `env_vars` is True, environment variables are checked as a fallback 99 after any explicitly provided values. 100 """ 101 credentials = _AirbyteCredentials.from_auth( 102 organization_id=organization_id, 103 client_id=client_id, 104 client_secret=client_secret, 105 bearer_token=bearer_token, 106 public_api_root=public_api_root, 107 config_api_root=config_api_root, 108 env_vars=env_vars, 109 ) 110 return cls._from_credentials(credentials)
Create a client from explicit inputs and optionally environment variables.
When env_vars is True, environment variables are checked as a fallback
after any explicitly provided values.
125 def get_workspace(self, workspace_id: str | None = None) -> CloudWorkspace: 126 """Create a `CloudWorkspace` using this client's credentials.""" 127 resolved_workspace_id = workspace_id or self._credentials.workspace_id 128 if not resolved_workspace_id: 129 raise exc.PyAirbyteInputError( 130 message="Workspace ID is required.", 131 guidance="Provide a workspace ID.", 132 ) 133 134 credentials = self._credentials.with_workspace_id(resolved_workspace_id) 135 return CloudWorkspace( 136 workspace_id=credentials.workspace_id, 137 client_id=credentials.client_id, 138 client_secret=credentials.client_secret, 139 bearer_token=credentials.bearer_token, 140 api_root=credentials.public_api_root, 141 config_api_root=credentials.config_api_root, 142 )
Create a CloudWorkspace using this client's credentials.
144 def create_workspace( 145 self, 146 *, 147 name: str, 148 organization_id: str | None = None, 149 region_id: str | None = None, 150 ) -> CloudWorkspaceInfo: 151 """Create an Airbyte workspace.""" 152 resolved_organization_id = organization_id or self.organization_id 153 workspace = api_util.create_workspace( 154 name=name, 155 organization_id=resolved_organization_id, 156 region_id=region_id, 157 api_root=self.public_api_root, 158 client_id=self.client_id, 159 client_secret=self.client_secret, 160 bearer_token=self.bearer_token, 161 ) 162 return CloudWorkspaceInfo.from_api_response(workspace)
Create an Airbyte workspace.
164 def rename_workspace( 165 self, 166 workspace_id: str, 167 *, 168 name: str, 169 ) -> CloudWorkspaceInfo: 170 """Rename an Airbyte workspace.""" 171 workspace = api_util.rename_workspace( 172 workspace_id=workspace_id, 173 name=name, 174 api_root=self.public_api_root, 175 client_id=self.client_id, 176 client_secret=self.client_secret, 177 bearer_token=self.bearer_token, 178 ) 179 return CloudWorkspaceInfo.from_api_response(workspace)
Rename an Airbyte workspace.
181 def permanently_delete_workspace( 182 self, 183 workspace_id: str, 184 *, 185 workspace_name: str | None = None, 186 safe_mode: bool = True, 187 ) -> None: 188 """Permanently delete an Airbyte workspace if it has no connections. 189 190 When `safe_mode` is enabled, the workspace name must contain `delete-me` 191 or `deleteme`. This also checks for existing connections before deleting 192 and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty. 193 """ 194 api_util.permanently_delete_workspace( 195 workspace_id=workspace_id, 196 workspace_name=workspace_name, 197 api_root=self.public_api_root, 198 client_id=self.client_id, 199 client_secret=self.client_secret, 200 bearer_token=self.bearer_token, 201 safe_mode=safe_mode, 202 )
Permanently delete an Airbyte workspace if it has no connections.
When safe_mode is enabled, the workspace name must contain delete-me
or deleteme. This also checks for existing connections before deleting
and raises AirbyteWorkspaceNotEmptyError if the workspace is not empty.
228 def list_workspaces( 229 self, 230 name: str | None = None, 231 *, 232 organization_id: str | None = None, 233 name_contains: str | None = None, 234 name_filter: Callable[[str], bool] | None = None, 235 limit: int | None = None, 236 ) -> list[CloudWorkspaceInfo]: 237 """List workspaces available to this client.""" 238 if organization_id is not None or self.organization_id is not None: 239 resolved_organization_id = organization_id or self.organization_id 240 if not resolved_organization_id: 241 raise exc.PyAirbyteInputError( 242 message="Organization ID is required.", 243 guidance="Provide an organization ID.", 244 ) 245 workspaces = api_util.list_workspaces_in_organization( 246 organization_id=resolved_organization_id, 247 api_root=self.public_api_root, 248 config_api_root=self.config_api_root, 249 client_id=self.client_id, 250 client_secret=self.client_secret, 251 bearer_token=self.bearer_token, 252 name_contains=name_contains or name, 253 limit=None if name_filter is not None else limit, 254 ) 255 workspace_infos = [ 256 CloudWorkspaceInfo.from_mapping(workspace) for workspace in workspaces 257 ] 258 if name_filter is not None: 259 workspace_infos = [ 260 workspace for workspace in workspace_infos if name_filter(workspace.name) 261 ] 262 if limit is not None: 263 workspace_infos = workspace_infos[:limit] 264 return workspace_infos 265 if name_contains is not None: 266 if name_filter is not None: 267 raise exc.PyAirbyteInputError( 268 message="You can provide name_contains or name_filter, but not both." 269 ) 270 name_substring = name_contains 271 272 def name_filter(workspace_name: str) -> bool: 273 return name_substring in workspace_name 274 275 return [ 276 CloudWorkspaceInfo.from_api_response(workspace) 277 for workspace in api_util.list_workspaces( 278 workspace_id="", 279 api_root=self.public_api_root, 280 name=name, 281 name_filter=name_filter, 282 client_id=self.client_id, 283 client_secret=self.client_secret, 284 bearer_token=self.bearer_token, 285 limit=limit, 286 ) 287 ]
List workspaces available to this client.
289 def get_organization( 290 self, 291 organization_id: str | None = None, 292 *, 293 organization_name: str | None = None, 294 ) -> CloudOrganization: 295 """Resolve an organization by ID or exact name.""" 296 resolved_organization_id = organization_id or self.organization_id 297 if resolved_organization_id and organization_name: 298 raise exc.PyAirbyteInputError( 299 message="Provide either organization ID or organization name." 300 ) 301 if not resolved_organization_id and not organization_name: 302 raise exc.PyAirbyteInputError( 303 message="Organization ID or organization name is required." 304 ) 305 306 organizations = api_util.list_organizations_for_user( 307 api_root=self.public_api_root, 308 client_id=self.client_id, 309 client_secret=self.client_secret, 310 bearer_token=self.bearer_token, 311 ) 312 if resolved_organization_id: 313 matching_organizations = [ 314 organization 315 for organization in organizations 316 if organization.organization_id == resolved_organization_id 317 ] 318 else: 319 matching_organizations = [ 320 organization 321 for organization in organizations 322 if organization.organization_name == organization_name 323 ] 324 325 if not matching_organizations: 326 raise AirbyteMissingResourceError( 327 resource_type="organization", 328 resource_name_or_id=resolved_organization_id or organization_name, 329 ) 330 if len(matching_organizations) > 1: 331 raise exc.PyAirbyteInputError( 332 message="Organization name matches multiple organizations.", 333 context={"organization_name": organization_name}, 334 ) 335 336 organization = matching_organizations[0] 337 338 organization_credentials = self._credentials.with_organization_id( 339 organization.organization_id 340 ) 341 return CloudOrganization( 342 organization_id=organization.organization_id, 343 organization_name=organization.organization_name, 344 email=organization.email, 345 client_id=organization_credentials.client_id, 346 client_secret=organization_credentials.client_secret, 347 bearer_token=organization_credentials.bearer_token, 348 public_api_root=organization_credentials.public_api_root, 349 config_api_root=organization_credentials.config_api_root, 350 )
Resolve an organization by ID or exact name.