airbyte.cloud
PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
Self-managed Airbyte instances
For self-managed Airbyte instances, set api_root to the Public API root for your
deployment. For the default self-managed route, that usually ends in /api/public/v1.
PyAirbyte uses the Public API for workspace and organization discovery.
Some Cloud module methods also call the Config API, including methods such as
CloudConnection.dump_raw_catalog(), which reads the configured catalog directly
from Airbyte. For documented self-managed deployments where the Public API root ends in
/api/public/v1, PyAirbyte infers the Config API root by replacing that suffix with
/api/v1.
If your deployment uses custom ingress or a nonstandard reverse proxy, pass
config_api_root explicitly or set the AIRBYTE_CLOUD_CONFIG_API_URL environment
variable.
from airbyte import cloud
workspace = cloud.CloudWorkspace(
workspace_id="...",
client_id="...",
client_secret="...",
api_root="https://airbyte.example.com/api/public/v1",
config_api_root="https://airbyte.example.com/api/v1",
)
connection = workspace.get_connection(connection_id="...")
raw_catalog = connection.dump_raw_catalog()
Examples
Basic Sync Example:
import airbyte as ab
from airbyte import cloud
# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
workspace_id="123",
api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)
# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())
Example Read From Cloud Destination:
If your destination is supported, you can read records directly from the
SyncResult object. Currently this is supported in Snowflake and BigQuery only.
# Assuming we've already created a `connection` object...
# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)
# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")
# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")
# Or iterate over the dataset directly
for record in dataset:
print(record)
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API. 3 4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise. 5 6## Self-managed Airbyte instances 7 8For self-managed Airbyte instances, set `api_root` to the Public API root for your 9deployment. For the default self-managed route, that usually ends in `/api/public/v1`. 10PyAirbyte uses the Public API for workspace and organization discovery. 11 12Some Cloud module methods also call the Config API, including methods such as 13`CloudConnection.dump_raw_catalog()`, which reads the configured catalog directly 14from Airbyte. For documented self-managed deployments where the Public API root ends in 15`/api/public/v1`, PyAirbyte infers the Config API root by replacing that suffix with 16`/api/v1`. 17 18If your deployment uses custom ingress or a nonstandard reverse proxy, pass 19`config_api_root` explicitly or set the `AIRBYTE_CLOUD_CONFIG_API_URL` environment 20variable. 21 22```python 23from airbyte import cloud 24 25workspace = cloud.CloudWorkspace( 26 workspace_id="...", 27 client_id="...", 28 client_secret="...", 29 api_root="https://airbyte.example.com/api/public/v1", 30 config_api_root="https://airbyte.example.com/api/v1", 31) 32 33connection = workspace.get_connection(connection_id="...") 34raw_catalog = connection.dump_raw_catalog() 35``` 36 37## Examples 38 39### Basic Sync Example: 40 41```python 42import airbyte as ab 43from airbyte import cloud 44 45# Initialize an Airbyte Cloud workspace object 46workspace = cloud.CloudWorkspace( 47 workspace_id="123", 48 api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"), 49) 50 51# Run a sync job on Airbyte Cloud 52connection = workspace.get_connection(connection_id="456") 53sync_result = connection.run_sync() 54print(sync_result.get_job_status()) 55``` 56 57### Example Read From Cloud Destination: 58 59If your destination is supported, you can read records directly from the 60`SyncResult` object. Currently this is supported in Snowflake and BigQuery only. 61 62 63```python 64# Assuming we've already created a `connection` object... 65 66# Get the latest job result and print the stream names 67sync_result = connection.get_sync_result() 68print(sync_result.stream_names) 69 70# Get a dataset from the sync result 71dataset: CachedDataset = sync_result.get_dataset("users") 72 73# Get a SQLAlchemy table to use in SQL queries... 74users_table = dataset.to_sql_table() 75print(f"Table name: {users_table.name}") 76 77# Or iterate over the dataset directly 78for record in dataset: 79 print(record) 80``` 81""" 82 83from __future__ import annotations 84 85from typing import TYPE_CHECKING 86 87from airbyte.cloud.client import CloudClient 88from airbyte.cloud.client_config import CloudClientConfig 89from airbyte.cloud.connections import CloudConnection 90from airbyte.cloud.models import CloudWorkspaceInfo, JobStatusEnum, JobTypeEnum 91from airbyte.cloud.organizations import CloudOrganization 92from airbyte.cloud.sync_results import SyncResult 93from airbyte.cloud.workspaces import CloudWorkspace 94 95 96# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 97if TYPE_CHECKING: 98 # ruff: noqa: TC004 99 from airbyte.cloud import ( 100 client, 101 client_config, 102 connections, 103 constants, 104 organizations, 105 sync_results, 106 workspaces, 107 ) 108 109 110__all__ = [ 111 # Submodules 112 "workspaces", 113 "client", 114 "organizations", 115 "connections", 116 "constants", 117 "client_config", 118 "sync_results", 119 # Classes 120 "CloudClient", 121 "CloudOrganization", 122 "CloudWorkspace", 123 "CloudConnection", 124 "CloudClientConfig", 125 "CloudWorkspaceInfo", 126 "SyncResult", 127 # Enums 128 "JobStatusEnum", 129 "JobTypeEnum", 130]
25@dataclass(init=False, kw_only=True) 26class CloudClient: 27 """Authenticated client for Airbyte Cloud and self-managed Airbyte APIs.""" 28 29 _credentials: _AirbyteCredentials 30 31 def __init__( 32 self, 33 *, 34 client_id: str | SecretString | None = None, 35 client_secret: str | SecretString | None = None, 36 bearer_token: str | SecretString | None = None, 37 public_api_root: str | None = None, 38 config_api_root: str | None = None, 39 workspace_id: str | None = None, 40 organization_id: str | None = None, 41 ) -> None: 42 """Initialize a `CloudClient` from explicit auth values.""" 43 self._credentials = _AirbyteCredentials.from_auth( 44 client_id=client_id, 45 client_secret=client_secret, 46 bearer_token=bearer_token, 47 public_api_root=public_api_root, 48 config_api_root=config_api_root, 49 workspace_id=workspace_id, 50 organization_id=organization_id, 51 env_vars=False, 52 ) 53 54 @property 55 def client_id(self) -> SecretString | None: 56 """OAuth client ID used for authentication.""" 57 return self._credentials.client_id 58 59 @property 60 def client_secret(self) -> SecretString | None: 61 """OAuth client secret used for authentication.""" 62 return self._credentials.client_secret 63 64 @property 65 def bearer_token(self) -> SecretString | None: 66 """Bearer token used for authentication.""" 67 return self._credentials.bearer_token 68 69 @property 70 def public_api_root(self) -> str: 71 """Airbyte Public API root.""" 72 return self._credentials.public_api_root 73 74 @property 75 def config_api_root(self) -> str | None: 76 """Airbyte Config API root.""" 77 return self._credentials.config_api_root 78 79 @property 80 def organization_id(self) -> str | None: 81 """Default organization ID for organization-scoped operations.""" 82 return self._credentials.organization_id 83 84 @classmethod 85 def from_auth( 86 cls, 87 *, 88 env_vars: bool = False, 89 organization_id: str | None = None, 90 client_id: str | SecretString | None = None, 91 client_secret: str | SecretString | None = None, 92 bearer_token: str | SecretString | None = None, 93 public_api_root: str | None = None, 94 config_api_root: str | None = None, 95 ) -> CloudClient: 96 """Create a client from explicit inputs and optionally environment variables. 97 98 When `env_vars` is True, environment variables are checked as a fallback 99 after any explicitly provided values. 100 """ 101 credentials = _AirbyteCredentials.from_auth( 102 organization_id=organization_id, 103 client_id=client_id, 104 client_secret=client_secret, 105 bearer_token=bearer_token, 106 public_api_root=public_api_root, 107 config_api_root=config_api_root, 108 env_vars=env_vars, 109 ) 110 return cls._from_credentials(credentials) 111 112 @classmethod 113 def _from_credentials(cls, credentials: _AirbyteCredentials) -> CloudClient: 114 """Create a client from resolved Cloud credentials.""" 115 return cls( 116 client_id=credentials.client_id, 117 client_secret=credentials.client_secret, 118 bearer_token=credentials.bearer_token, 119 public_api_root=credentials.public_api_root, 120 config_api_root=credentials.config_api_root, 121 workspace_id=credentials.workspace_id, 122 organization_id=credentials.organization_id, 123 ) 124 125 def get_workspace(self, workspace_id: str | None = None) -> CloudWorkspace: 126 """Create a `CloudWorkspace` using this client's credentials.""" 127 resolved_workspace_id = workspace_id or self._credentials.workspace_id 128 if not resolved_workspace_id: 129 raise exc.PyAirbyteInputError( 130 message="Workspace ID is required.", 131 guidance="Provide a workspace ID.", 132 ) 133 134 credentials = self._credentials.with_workspace_id(resolved_workspace_id) 135 return CloudWorkspace( 136 workspace_id=credentials.workspace_id, 137 client_id=credentials.client_id, 138 client_secret=credentials.client_secret, 139 bearer_token=credentials.bearer_token, 140 api_root=credentials.public_api_root, 141 config_api_root=credentials.config_api_root, 142 ) 143 144 def create_workspace( 145 self, 146 *, 147 name: str, 148 organization_id: str | None = None, 149 region_id: str | None = None, 150 ) -> CloudWorkspaceInfo: 151 """Create an Airbyte workspace.""" 152 resolved_organization_id = organization_id or self.organization_id 153 workspace = api_util.create_workspace( 154 name=name, 155 organization_id=resolved_organization_id, 156 region_id=region_id, 157 api_root=self.public_api_root, 158 client_id=self.client_id, 159 client_secret=self.client_secret, 160 bearer_token=self.bearer_token, 161 ) 162 return CloudWorkspaceInfo.from_api_response(workspace) 163 164 def rename_workspace( 165 self, 166 workspace_id: str, 167 *, 168 name: str, 169 ) -> CloudWorkspaceInfo: 170 """Rename an Airbyte workspace.""" 171 workspace = api_util.rename_workspace( 172 workspace_id=workspace_id, 173 name=name, 174 api_root=self.public_api_root, 175 client_id=self.client_id, 176 client_secret=self.client_secret, 177 bearer_token=self.bearer_token, 178 ) 179 return CloudWorkspaceInfo.from_api_response(workspace) 180 181 def permanently_delete_workspace( 182 self, 183 workspace_id: str, 184 *, 185 workspace_name: str | None = None, 186 safe_mode: bool = True, 187 ) -> None: 188 """Permanently delete an Airbyte workspace if it has no connections. 189 190 When `safe_mode` is enabled, the workspace name must contain `delete-me` 191 or `deleteme`. This also checks for existing connections before deleting 192 and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty. 193 """ 194 api_util.permanently_delete_workspace( 195 workspace_id=workspace_id, 196 workspace_name=workspace_name, 197 api_root=self.public_api_root, 198 client_id=self.client_id, 199 client_secret=self.client_secret, 200 bearer_token=self.bearer_token, 201 safe_mode=safe_mode, 202 ) 203 204 @overload 205 def list_workspaces( 206 self, 207 name: str | None = None, 208 *, 209 organization_id: None = None, 210 name_contains: str | None = None, 211 name_filter: Callable[[str], bool] | None = None, 212 limit: int | None = None, 213 ) -> list[CloudWorkspaceInfo]: 214 raise NotImplementedError 215 216 @overload 217 def list_workspaces( 218 self, 219 name: str | None = None, 220 *, 221 organization_id: str, 222 name_contains: str | None = None, 223 name_filter: Callable[[str], bool] | None = None, 224 limit: int | None = None, 225 ) -> list[CloudWorkspaceInfo]: 226 raise NotImplementedError 227 228 def list_workspaces( 229 self, 230 name: str | None = None, 231 *, 232 organization_id: str | None = None, 233 name_contains: str | None = None, 234 name_filter: Callable[[str], bool] | None = None, 235 limit: int | None = None, 236 ) -> list[CloudWorkspaceInfo]: 237 """List workspaces available to this client.""" 238 if organization_id is not None or self.organization_id is not None: 239 resolved_organization_id = organization_id or self.organization_id 240 if not resolved_organization_id: 241 raise exc.PyAirbyteInputError( 242 message="Organization ID is required.", 243 guidance="Provide an organization ID.", 244 ) 245 workspaces = api_util.list_workspaces_in_organization( 246 organization_id=resolved_organization_id, 247 api_root=self.public_api_root, 248 config_api_root=self.config_api_root, 249 client_id=self.client_id, 250 client_secret=self.client_secret, 251 bearer_token=self.bearer_token, 252 name_contains=name_contains or name, 253 limit=None if name_filter is not None else limit, 254 ) 255 workspace_infos = [ 256 CloudWorkspaceInfo.from_mapping(workspace) for workspace in workspaces 257 ] 258 if name_filter is not None: 259 workspace_infos = [ 260 workspace for workspace in workspace_infos if name_filter(workspace.name) 261 ] 262 if limit is not None: 263 workspace_infos = workspace_infos[:limit] 264 return workspace_infos 265 if name_contains is not None: 266 if name_filter is not None: 267 raise exc.PyAirbyteInputError( 268 message="You can provide name_contains or name_filter, but not both." 269 ) 270 name_substring = name_contains 271 272 def name_filter(workspace_name: str) -> bool: 273 return name_substring in workspace_name 274 275 return [ 276 CloudWorkspaceInfo.from_api_response(workspace) 277 for workspace in api_util.list_workspaces( 278 workspace_id="", 279 api_root=self.public_api_root, 280 name=name, 281 name_filter=name_filter, 282 client_id=self.client_id, 283 client_secret=self.client_secret, 284 bearer_token=self.bearer_token, 285 limit=limit, 286 ) 287 ] 288 289 def get_organization( 290 self, 291 organization_id: str | None = None, 292 *, 293 organization_name: str | None = None, 294 ) -> CloudOrganization: 295 """Resolve an organization by ID or exact name.""" 296 resolved_organization_id = organization_id or self.organization_id 297 if resolved_organization_id and organization_name: 298 raise exc.PyAirbyteInputError( 299 message="Provide either organization ID or organization name." 300 ) 301 if not resolved_organization_id and not organization_name: 302 raise exc.PyAirbyteInputError( 303 message="Organization ID or organization name is required." 304 ) 305 306 organizations = api_util.list_organizations_for_user( 307 api_root=self.public_api_root, 308 client_id=self.client_id, 309 client_secret=self.client_secret, 310 bearer_token=self.bearer_token, 311 ) 312 if resolved_organization_id: 313 matching_organizations = [ 314 organization 315 for organization in organizations 316 if organization.organization_id == resolved_organization_id 317 ] 318 else: 319 matching_organizations = [ 320 organization 321 for organization in organizations 322 if organization.organization_name == organization_name 323 ] 324 325 if not matching_organizations: 326 raise AirbyteMissingResourceError( 327 resource_type="organization", 328 resource_name_or_id=resolved_organization_id or organization_name, 329 ) 330 if len(matching_organizations) > 1: 331 raise exc.PyAirbyteInputError( 332 message="Organization name matches multiple organizations.", 333 context={"organization_name": organization_name}, 334 ) 335 336 organization = matching_organizations[0] 337 338 organization_credentials = self._credentials.with_organization_id( 339 organization.organization_id 340 ) 341 return CloudOrganization( 342 organization_id=organization.organization_id, 343 organization_name=organization.organization_name, 344 email=organization.email, 345 client_id=organization_credentials.client_id, 346 client_secret=organization_credentials.client_secret, 347 bearer_token=organization_credentials.bearer_token, 348 public_api_root=organization_credentials.public_api_root, 349 config_api_root=organization_credentials.config_api_root, 350 )
Authenticated client for Airbyte Cloud and self-managed Airbyte APIs.
31 def __init__( 32 self, 33 *, 34 client_id: str | SecretString | None = None, 35 client_secret: str | SecretString | None = None, 36 bearer_token: str | SecretString | None = None, 37 public_api_root: str | None = None, 38 config_api_root: str | None = None, 39 workspace_id: str | None = None, 40 organization_id: str | None = None, 41 ) -> None: 42 """Initialize a `CloudClient` from explicit auth values.""" 43 self._credentials = _AirbyteCredentials.from_auth( 44 client_id=client_id, 45 client_secret=client_secret, 46 bearer_token=bearer_token, 47 public_api_root=public_api_root, 48 config_api_root=config_api_root, 49 workspace_id=workspace_id, 50 organization_id=organization_id, 51 env_vars=False, 52 )
Initialize a CloudClient from explicit auth values.
54 @property 55 def client_id(self) -> SecretString | None: 56 """OAuth client ID used for authentication.""" 57 return self._credentials.client_id
OAuth client ID used for authentication.
59 @property 60 def client_secret(self) -> SecretString | None: 61 """OAuth client secret used for authentication.""" 62 return self._credentials.client_secret
OAuth client secret used for authentication.
64 @property 65 def bearer_token(self) -> SecretString | None: 66 """Bearer token used for authentication.""" 67 return self._credentials.bearer_token
Bearer token used for authentication.
69 @property 70 def public_api_root(self) -> str: 71 """Airbyte Public API root.""" 72 return self._credentials.public_api_root
Airbyte Public API root.
74 @property 75 def config_api_root(self) -> str | None: 76 """Airbyte Config API root.""" 77 return self._credentials.config_api_root
Airbyte Config API root.
79 @property 80 def organization_id(self) -> str | None: 81 """Default organization ID for organization-scoped operations.""" 82 return self._credentials.organization_id
Default organization ID for organization-scoped operations.
84 @classmethod 85 def from_auth( 86 cls, 87 *, 88 env_vars: bool = False, 89 organization_id: str | None = None, 90 client_id: str | SecretString | None = None, 91 client_secret: str | SecretString | None = None, 92 bearer_token: str | SecretString | None = None, 93 public_api_root: str | None = None, 94 config_api_root: str | None = None, 95 ) -> CloudClient: 96 """Create a client from explicit inputs and optionally environment variables. 97 98 When `env_vars` is True, environment variables are checked as a fallback 99 after any explicitly provided values. 100 """ 101 credentials = _AirbyteCredentials.from_auth( 102 organization_id=organization_id, 103 client_id=client_id, 104 client_secret=client_secret, 105 bearer_token=bearer_token, 106 public_api_root=public_api_root, 107 config_api_root=config_api_root, 108 env_vars=env_vars, 109 ) 110 return cls._from_credentials(credentials)
Create a client from explicit inputs and optionally environment variables.
When env_vars is True, environment variables are checked as a fallback
after any explicitly provided values.
125 def get_workspace(self, workspace_id: str | None = None) -> CloudWorkspace: 126 """Create a `CloudWorkspace` using this client's credentials.""" 127 resolved_workspace_id = workspace_id or self._credentials.workspace_id 128 if not resolved_workspace_id: 129 raise exc.PyAirbyteInputError( 130 message="Workspace ID is required.", 131 guidance="Provide a workspace ID.", 132 ) 133 134 credentials = self._credentials.with_workspace_id(resolved_workspace_id) 135 return CloudWorkspace( 136 workspace_id=credentials.workspace_id, 137 client_id=credentials.client_id, 138 client_secret=credentials.client_secret, 139 bearer_token=credentials.bearer_token, 140 api_root=credentials.public_api_root, 141 config_api_root=credentials.config_api_root, 142 )
Create a CloudWorkspace using this client's credentials.
144 def create_workspace( 145 self, 146 *, 147 name: str, 148 organization_id: str | None = None, 149 region_id: str | None = None, 150 ) -> CloudWorkspaceInfo: 151 """Create an Airbyte workspace.""" 152 resolved_organization_id = organization_id or self.organization_id 153 workspace = api_util.create_workspace( 154 name=name, 155 organization_id=resolved_organization_id, 156 region_id=region_id, 157 api_root=self.public_api_root, 158 client_id=self.client_id, 159 client_secret=self.client_secret, 160 bearer_token=self.bearer_token, 161 ) 162 return CloudWorkspaceInfo.from_api_response(workspace)
Create an Airbyte workspace.
164 def rename_workspace( 165 self, 166 workspace_id: str, 167 *, 168 name: str, 169 ) -> CloudWorkspaceInfo: 170 """Rename an Airbyte workspace.""" 171 workspace = api_util.rename_workspace( 172 workspace_id=workspace_id, 173 name=name, 174 api_root=self.public_api_root, 175 client_id=self.client_id, 176 client_secret=self.client_secret, 177 bearer_token=self.bearer_token, 178 ) 179 return CloudWorkspaceInfo.from_api_response(workspace)
Rename an Airbyte workspace.
181 def permanently_delete_workspace( 182 self, 183 workspace_id: str, 184 *, 185 workspace_name: str | None = None, 186 safe_mode: bool = True, 187 ) -> None: 188 """Permanently delete an Airbyte workspace if it has no connections. 189 190 When `safe_mode` is enabled, the workspace name must contain `delete-me` 191 or `deleteme`. This also checks for existing connections before deleting 192 and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty. 193 """ 194 api_util.permanently_delete_workspace( 195 workspace_id=workspace_id, 196 workspace_name=workspace_name, 197 api_root=self.public_api_root, 198 client_id=self.client_id, 199 client_secret=self.client_secret, 200 bearer_token=self.bearer_token, 201 safe_mode=safe_mode, 202 )
Permanently delete an Airbyte workspace if it has no connections.
When safe_mode is enabled, the workspace name must contain delete-me
or deleteme. This also checks for existing connections before deleting
and raises AirbyteWorkspaceNotEmptyError if the workspace is not empty.
228 def list_workspaces( 229 self, 230 name: str | None = None, 231 *, 232 organization_id: str | None = None, 233 name_contains: str | None = None, 234 name_filter: Callable[[str], bool] | None = None, 235 limit: int | None = None, 236 ) -> list[CloudWorkspaceInfo]: 237 """List workspaces available to this client.""" 238 if organization_id is not None or self.organization_id is not None: 239 resolved_organization_id = organization_id or self.organization_id 240 if not resolved_organization_id: 241 raise exc.PyAirbyteInputError( 242 message="Organization ID is required.", 243 guidance="Provide an organization ID.", 244 ) 245 workspaces = api_util.list_workspaces_in_organization( 246 organization_id=resolved_organization_id, 247 api_root=self.public_api_root, 248 config_api_root=self.config_api_root, 249 client_id=self.client_id, 250 client_secret=self.client_secret, 251 bearer_token=self.bearer_token, 252 name_contains=name_contains or name, 253 limit=None if name_filter is not None else limit, 254 ) 255 workspace_infos = [ 256 CloudWorkspaceInfo.from_mapping(workspace) for workspace in workspaces 257 ] 258 if name_filter is not None: 259 workspace_infos = [ 260 workspace for workspace in workspace_infos if name_filter(workspace.name) 261 ] 262 if limit is not None: 263 workspace_infos = workspace_infos[:limit] 264 return workspace_infos 265 if name_contains is not None: 266 if name_filter is not None: 267 raise exc.PyAirbyteInputError( 268 message="You can provide name_contains or name_filter, but not both." 269 ) 270 name_substring = name_contains 271 272 def name_filter(workspace_name: str) -> bool: 273 return name_substring in workspace_name 274 275 return [ 276 CloudWorkspaceInfo.from_api_response(workspace) 277 for workspace in api_util.list_workspaces( 278 workspace_id="", 279 api_root=self.public_api_root, 280 name=name, 281 name_filter=name_filter, 282 client_id=self.client_id, 283 client_secret=self.client_secret, 284 bearer_token=self.bearer_token, 285 limit=limit, 286 ) 287 ]
List workspaces available to this client.
289 def get_organization( 290 self, 291 organization_id: str | None = None, 292 *, 293 organization_name: str | None = None, 294 ) -> CloudOrganization: 295 """Resolve an organization by ID or exact name.""" 296 resolved_organization_id = organization_id or self.organization_id 297 if resolved_organization_id and organization_name: 298 raise exc.PyAirbyteInputError( 299 message="Provide either organization ID or organization name." 300 ) 301 if not resolved_organization_id and not organization_name: 302 raise exc.PyAirbyteInputError( 303 message="Organization ID or organization name is required." 304 ) 305 306 organizations = api_util.list_organizations_for_user( 307 api_root=self.public_api_root, 308 client_id=self.client_id, 309 client_secret=self.client_secret, 310 bearer_token=self.bearer_token, 311 ) 312 if resolved_organization_id: 313 matching_organizations = [ 314 organization 315 for organization in organizations 316 if organization.organization_id == resolved_organization_id 317 ] 318 else: 319 matching_organizations = [ 320 organization 321 for organization in organizations 322 if organization.organization_name == organization_name 323 ] 324 325 if not matching_organizations: 326 raise AirbyteMissingResourceError( 327 resource_type="organization", 328 resource_name_or_id=resolved_organization_id or organization_name, 329 ) 330 if len(matching_organizations) > 1: 331 raise exc.PyAirbyteInputError( 332 message="Organization name matches multiple organizations.", 333 context={"organization_name": organization_name}, 334 ) 335 336 organization = matching_organizations[0] 337 338 organization_credentials = self._credentials.with_organization_id( 339 organization.organization_id 340 ) 341 return CloudOrganization( 342 organization_id=organization.organization_id, 343 organization_name=organization.organization_name, 344 email=organization.email, 345 client_id=organization_credentials.client_id, 346 client_secret=organization_credentials.client_secret, 347 bearer_token=organization_credentials.bearer_token, 348 public_api_root=organization_credentials.public_api_root, 349 config_api_root=organization_credentials.config_api_root, 350 )
Resolve an organization by ID or exact name.
18class CloudOrganization: 19 """Information about an organization in Airbyte Cloud. 20 21 This class provides lazy loading of organization attributes including billing status. 22 It is typically created via `CloudWorkspace.get_organization()`. 23 """ 24 25 def __init__( 26 self, 27 organization_id: str, 28 organization_name: str | None = None, 29 email: str | None = None, 30 *, 31 client_id: str | SecretString | None = None, 32 client_secret: str | SecretString | None = None, 33 bearer_token: str | SecretString | None = None, 34 public_api_root: str | None = None, 35 config_api_root: str | None = None, 36 ) -> None: 37 """Initialize a `CloudOrganization`.""" 38 self.organization_id = organization_id 39 """The organization ID.""" 40 41 self._organization_name = organization_name 42 """Display name of the organization.""" 43 44 self._email = email 45 """Email associated with the organization.""" 46 47 self._credentials = _AirbyteCredentials( 48 client_id=SecretString(client_id) if client_id else None, 49 client_secret=SecretString(client_secret) if client_secret else None, 50 bearer_token=SecretString(bearer_token) if bearer_token else None, 51 public_api_root=public_api_root or api_util.CLOUD_API_ROOT, 52 config_api_root=config_api_root, 53 organization_id=organization_id, 54 ) 55 self._organization_info: dict[str, Any] | None = None 56 self._organization_info_fetch_failed: bool = False 57 58 def _fetch_organization_info(self, *, force_refresh: bool = False) -> dict[str, Any]: 59 """Fetch and cache organization info including billing status.""" 60 if force_refresh: 61 self._organization_info_fetch_failed = False 62 63 if self._organization_info_fetch_failed and self._organization_info is None: 64 return {} 65 66 if not force_refresh and self._organization_info is not None: 67 return self._organization_info 68 69 try: 70 self._organization_info = api_util.get_organization_info( 71 organization_id=self.organization_id, 72 api_root=self._credentials.public_api_root, 73 config_api_root=self._credentials.config_api_root, 74 client_id=self._credentials.client_id, 75 client_secret=self._credentials.client_secret, 76 bearer_token=self._credentials.bearer_token, 77 ) 78 except Exception as ex: 79 logger.debug("Failed to fetch organization info.", exc_info=ex) 80 if self._organization_info is None: 81 self._organization_info_fetch_failed = True 82 return self._organization_info or {} 83 else: 84 return self._organization_info 85 86 @property 87 def organization_name(self) -> str | None: 88 """Display name of the organization.""" 89 if self._organization_name is not None: 90 return self._organization_name 91 info = self._fetch_organization_info() 92 return info.get("organizationName") 93 94 @property 95 def email(self) -> str | None: 96 """Email associated with the organization.""" 97 if self._email is not None: 98 return self._email 99 info = self._fetch_organization_info() 100 return info.get("email") 101 102 @property 103 def payment_status(self) -> str | None: 104 """Payment status of the organization.""" 105 info = self._fetch_organization_info() 106 return (info.get("billing") or {}).get("paymentStatus") 107 108 @property 109 def subscription_status(self) -> str | None: 110 """Subscription status of the organization.""" 111 info = self._fetch_organization_info() 112 return (info.get("billing") or {}).get("subscriptionStatus") 113 114 @property 115 def is_account_locked(self) -> bool: 116 """Whether the account is locked due to billing issues.""" 117 return api_util.is_account_locked(self.payment_status, self.subscription_status)
Information about an organization in Airbyte Cloud.
This class provides lazy loading of organization attributes including billing status.
It is typically created via CloudWorkspace.get_organization().
25 def __init__( 26 self, 27 organization_id: str, 28 organization_name: str | None = None, 29 email: str | None = None, 30 *, 31 client_id: str | SecretString | None = None, 32 client_secret: str | SecretString | None = None, 33 bearer_token: str | SecretString | None = None, 34 public_api_root: str | None = None, 35 config_api_root: str | None = None, 36 ) -> None: 37 """Initialize a `CloudOrganization`.""" 38 self.organization_id = organization_id 39 """The organization ID.""" 40 41 self._organization_name = organization_name 42 """Display name of the organization.""" 43 44 self._email = email 45 """Email associated with the organization.""" 46 47 self._credentials = _AirbyteCredentials( 48 client_id=SecretString(client_id) if client_id else None, 49 client_secret=SecretString(client_secret) if client_secret else None, 50 bearer_token=SecretString(bearer_token) if bearer_token else None, 51 public_api_root=public_api_root or api_util.CLOUD_API_ROOT, 52 config_api_root=config_api_root, 53 organization_id=organization_id, 54 ) 55 self._organization_info: dict[str, Any] | None = None 56 self._organization_info_fetch_failed: bool = False
Initialize a CloudOrganization.
86 @property 87 def organization_name(self) -> str | None: 88 """Display name of the organization.""" 89 if self._organization_name is not None: 90 return self._organization_name 91 info = self._fetch_organization_info() 92 return info.get("organizationName")
Display name of the organization.
94 @property 95 def email(self) -> str | None: 96 """Email associated with the organization.""" 97 if self._email is not None: 98 return self._email 99 info = self._fetch_organization_info() 100 return info.get("email")
Email associated with the organization.
102 @property 103 def payment_status(self) -> str | None: 104 """Payment status of the organization.""" 105 info = self._fetch_organization_info() 106 return (info.get("billing") or {}).get("paymentStatus")
Payment status of the organization.
70@dataclass(init=False, kw_only=True) # noqa: PLR0904 # Core cloud API facade. 71class CloudWorkspace: 72 """A remote workspace on the Airbyte Cloud. 73 74 By overriding `api_root`, you can use this class to interact with self-managed Airbyte 75 instances, both OSS and Enterprise. 76 77 Two authentication methods are supported (mutually exclusive): 78 1. OAuth2 client credentials (client_id + client_secret) 79 2. Bearer token authentication 80 81 Example with client credentials: 82 ```python 83 workspace = CloudWorkspace( 84 workspace_id="...", 85 client_id="...", 86 client_secret="...", 87 ) 88 ``` 89 90 Example with bearer token: 91 ```python 92 workspace = CloudWorkspace( 93 workspace_id="...", 94 bearer_token="...", 95 ) 96 ``` 97 """ 98 99 workspace_id: str 100 client_id: SecretString | None 101 client_secret: SecretString | None 102 api_root: str 103 config_api_root: str | None 104 """The Config API root URL.""" 105 bearer_token: SecretString | None 106 107 # Internal credentials objects (set in __init__, excluded from repr) 108 _credentials: _AirbyteCredentials = field(init=False, repr=False) 109 _client_config: CloudClientConfig = field(init=False, repr=False) 110 111 def __init__( 112 self, 113 *, 114 workspace_id: str | None = None, 115 client_id: str | SecretString | None = None, 116 client_secret: str | SecretString | None = None, 117 api_root: str | None = None, 118 config_api_root: str | None = None, 119 bearer_token: str | SecretString | None = None, 120 ) -> None: 121 """Validate and initialize credentials.""" 122 env_vars = not (client_id or client_secret or bearer_token) 123 credentials = _AirbyteCredentials.from_auth( 124 workspace_id=workspace_id, 125 client_id=client_id, 126 client_secret=client_secret, 127 bearer_token=bearer_token, 128 public_api_root=api_root, 129 config_api_root=config_api_root, 130 env_vars=env_vars, 131 ) 132 if not credentials.workspace_id: 133 raise exc.PyAirbyteInputError( 134 message="Workspace ID is required.", 135 guidance="Provide a workspace ID.", 136 ) 137 138 self._credentials = credentials 139 self.workspace_id = credentials.workspace_id or "" 140 self.client_id = credentials.client_id 141 self.client_secret = credentials.client_secret 142 self.bearer_token = credentials.bearer_token 143 self.api_root = credentials.public_api_root 144 self.config_api_root = credentials.config_api_root 145 146 # Create internal CloudClientConfig object (validates mutual exclusivity) 147 self._client_config = CloudClientConfig( 148 client_id=self.client_id, 149 client_secret=self.client_secret, 150 bearer_token=self.bearer_token, 151 api_root=self.api_root, 152 config_api_root=self.config_api_root, 153 ) 154 155 @classmethod 156 def from_env( 157 cls, 158 workspace_id: str | None = None, 159 *, 160 api_root: str | None = None, 161 config_api_root: str | None = None, 162 ) -> CloudWorkspace: 163 """Create a CloudWorkspace using credentials from environment variables. 164 165 This factory method resolves credentials from environment variables, 166 providing a convenient way to create a workspace without explicitly 167 passing credentials. 168 169 Two authentication methods are supported (mutually exclusive): 170 1. Bearer token (checked first) 171 2. OAuth2 client credentials (fallback) 172 173 Environment variables used: 174 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 175 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 176 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 177 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 178 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 179 - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL. 180 181 Args: 182 workspace_id: The workspace ID. If not provided, will be resolved from 183 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 184 api_root: The API root URL. If not provided, will be resolved from 185 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 186 the Airbyte Cloud API. 187 config_api_root: The Config API root URL. If not provided, will be resolved 188 from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable. 189 190 Returns: 191 A CloudWorkspace instance configured with credentials from the environment. 192 193 Raises: 194 PyAirbyteInputError: If required credentials are not found in 195 the environment or are incomplete. 196 197 Example: 198 ```python 199 # With workspace_id from environment 200 workspace = CloudWorkspace.from_env() 201 202 # With explicit workspace_id 203 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 204 ``` 205 """ 206 return cls( 207 workspace_id=workspace_id, 208 api_root=api_root, 209 config_api_root=config_api_root, 210 ) 211 212 @property 213 def workspace_url(self) -> str | None: 214 """The web URL of the workspace.""" 215 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}" 216 217 @cached_property 218 def _organization_info(self) -> dict[str, Any]: 219 """Fetch and cache organization info for this workspace. 220 221 Uses the Config API endpoint for an efficient O(1) lookup. 222 This is an internal method; use get_organization() for public access. 223 """ 224 return api_util.get_workspace_organization_info( 225 workspace_id=self.workspace_id, 226 api_root=self.api_root, 227 config_api_root=self.config_api_root, 228 client_id=self.client_id, 229 client_secret=self.client_secret, 230 bearer_token=self.bearer_token, 231 ) 232 233 @overload 234 def get_organization(self) -> CloudOrganization: ... 235 236 @overload 237 def get_organization( 238 self, 239 *, 240 raise_on_error: Literal[True], 241 ) -> CloudOrganization: ... 242 243 @overload 244 def get_organization( 245 self, 246 *, 247 raise_on_error: Literal[False], 248 ) -> CloudOrganization | None: ... 249 250 def get_organization( 251 self, 252 *, 253 raise_on_error: bool = True, 254 ) -> CloudOrganization | None: 255 """Get the organization this workspace belongs to. 256 257 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 258 which may not be available with workspace-scoped credentials. 259 260 Args: 261 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 262 If False, returns None instead of raising. 263 264 Returns: 265 CloudOrganization object with organization_id and organization_name, 266 or None if raise_on_error=False and an error occurred. 267 268 Raises: 269 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 270 (e.g., due to insufficient permissions or missing data). 271 """ 272 try: 273 info = self._organization_info 274 except (AirbyteError, NotImplementedError): 275 if raise_on_error: 276 raise 277 return None 278 279 organization_id = info.get("organizationId") 280 organization_name = info.get("organizationName") 281 282 # Validate that both organization_id and organization_name are non-null and non-empty 283 if not organization_id or not organization_name: 284 if raise_on_error: 285 raise AirbyteError( 286 message="Organization info is incomplete.", 287 context={ 288 "organization_id": organization_id, 289 "organization_name": organization_name, 290 }, 291 ) 292 return None 293 294 organization_credentials = self._credentials.with_organization_id(organization_id) 295 return CloudOrganization( 296 organization_id=organization_id, 297 organization_name=organization_name, 298 client_id=organization_credentials.client_id, 299 client_secret=organization_credentials.client_secret, 300 bearer_token=organization_credentials.bearer_token, 301 public_api_root=organization_credentials.public_api_root, 302 config_api_root=organization_credentials.config_api_root, 303 ) 304 305 # Test connection and creds 306 307 def connect(self) -> None: 308 """Check that the workspace is reachable and raise an exception otherwise. 309 310 Note: It is not necessary to call this method before calling other operations. It 311 serves primarily as a simple check to ensure that the workspace is reachable 312 and credentials are correct. 313 """ 314 _ = api_util.get_workspace( 315 api_root=self.api_root, 316 workspace_id=self.workspace_id, 317 client_id=self.client_id, 318 client_secret=self.client_secret, 319 bearer_token=self.bearer_token, 320 ) 321 print(f"Successfully connected to workspace: {self.workspace_url}") 322 323 # Get sources, destinations, and connections 324 325 def get_connection( 326 self, 327 connection_id: str, 328 ) -> CloudConnection: 329 """Get a connection by ID. 330 331 This method does not fetch data from the API. It returns a `CloudConnection` object, 332 which will be loaded lazily as needed. 333 """ 334 return CloudConnection( 335 workspace=self, 336 connection_id=connection_id, 337 ) 338 339 def get_source( 340 self, 341 source_id: str, 342 ) -> CloudSource: 343 """Get a source by ID. 344 345 This method does not fetch data from the API. It returns a `CloudSource` object, 346 which will be loaded lazily as needed. 347 """ 348 return CloudSource( 349 workspace=self, 350 connector_id=source_id, 351 ) 352 353 def get_destination( 354 self, 355 destination_id: str, 356 ) -> CloudDestination: 357 """Get a destination by ID. 358 359 This method does not fetch data from the API. It returns a `CloudDestination` object, 360 which will be loaded lazily as needed. 361 """ 362 return CloudDestination( 363 workspace=self, 364 connector_id=destination_id, 365 ) 366 367 # Deploy sources and destinations 368 369 def deploy_source( 370 self, 371 name: str, 372 source: Source, 373 *, 374 unique: bool = True, 375 random_name_suffix: bool = False, 376 ) -> CloudSource: 377 """Deploy a source to the workspace. 378 379 Returns the newly deployed source. 380 381 Args: 382 name: The name to use when deploying. 383 source: The source object to deploy. 384 unique: Whether to require a unique name. If `True`, duplicate names 385 are not allowed. Defaults to `True`. 386 random_name_suffix: Whether to append a random suffix to the name. 387 """ 388 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 389 source_config_dict["sourceType"] = source.name.replace("source-", "") 390 391 if random_name_suffix: 392 name += f" (ID: {text_util.generate_random_suffix()})" 393 394 if unique: 395 existing = self.list_sources(name=name) 396 if existing: 397 raise exc.AirbyteDuplicateResourcesError( 398 resource_type="source", 399 resource_name=name, 400 ) 401 402 deployed_source = api_util.create_source( 403 name=name, 404 api_root=self.api_root, 405 workspace_id=self.workspace_id, 406 config=source_config_dict, 407 client_id=self.client_id, 408 client_secret=self.client_secret, 409 bearer_token=self.bearer_token, 410 ) 411 return CloudSource( 412 workspace=self, 413 connector_id=deployed_source.source_id, 414 ) 415 416 def deploy_destination( 417 self, 418 name: str, 419 destination: Destination | dict[str, Any], 420 *, 421 unique: bool = True, 422 random_name_suffix: bool = False, 423 ) -> CloudDestination: 424 """Deploy a destination to the workspace. 425 426 Returns the newly deployed destination ID. 427 428 Args: 429 name: The name to use when deploying. 430 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 431 dictionary of configuration values. 432 unique: Whether to require a unique name. If `True`, duplicate names 433 are not allowed. Defaults to `True`. 434 random_name_suffix: Whether to append a random suffix to the name. 435 """ 436 if isinstance(destination, Destination): 437 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 438 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 439 # raise ValueError(destination_conf_dict) 440 else: 441 destination_conf_dict = destination.copy() 442 if "destinationType" not in destination_conf_dict: 443 raise exc.PyAirbyteInputError( 444 message="Missing `destinationType` in configuration dictionary.", 445 ) 446 447 if random_name_suffix: 448 name += f" (ID: {text_util.generate_random_suffix()})" 449 450 if unique: 451 existing = self.list_destinations(name=name) 452 if existing: 453 raise exc.AirbyteDuplicateResourcesError( 454 resource_type="destination", 455 resource_name=name, 456 ) 457 458 deployed_destination = api_util.create_destination( 459 name=name, 460 api_root=self.api_root, 461 workspace_id=self.workspace_id, 462 config=destination_conf_dict, # Wants a dataclass but accepts dict 463 client_id=self.client_id, 464 client_secret=self.client_secret, 465 bearer_token=self.bearer_token, 466 ) 467 return CloudDestination( 468 workspace=self, 469 connector_id=deployed_destination.destination_id, 470 ) 471 472 def permanently_delete_source( 473 self, 474 source: str | CloudSource, 475 *, 476 safe_mode: bool = True, 477 ) -> None: 478 """Delete a source from the workspace. 479 480 You can pass either the source ID `str` or a deployed `Source` object. 481 482 Args: 483 source: The source ID or CloudSource object to delete 484 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 485 (case insensitive) to prevent accidental deletion. Defaults to True. 486 """ 487 if not isinstance(source, (str, CloudSource)): 488 raise exc.PyAirbyteInputError( 489 message="Invalid source type.", 490 input_value=type(source).__name__, 491 ) 492 493 api_util.delete_source( 494 source_id=source.connector_id if isinstance(source, CloudSource) else source, 495 source_name=source.name if isinstance(source, CloudSource) else None, 496 api_root=self.api_root, 497 client_id=self.client_id, 498 client_secret=self.client_secret, 499 bearer_token=self.bearer_token, 500 safe_mode=safe_mode, 501 ) 502 503 # Deploy and delete destinations 504 505 def permanently_delete_destination( 506 self, 507 destination: str | CloudDestination, 508 *, 509 safe_mode: bool = True, 510 ) -> None: 511 """Delete a deployed destination from the workspace. 512 513 You can pass either the `Cache` class or the deployed destination ID as a `str`. 514 515 Args: 516 destination: The destination ID or CloudDestination object to delete 517 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 518 (case insensitive) to prevent accidental deletion. Defaults to True. 519 """ 520 if not isinstance(destination, (str, CloudDestination)): 521 raise exc.PyAirbyteInputError( 522 message="Invalid destination type.", 523 input_value=type(destination).__name__, 524 ) 525 526 api_util.delete_destination( 527 destination_id=( 528 destination if isinstance(destination, str) else destination.destination_id 529 ), 530 destination_name=( 531 destination.name if isinstance(destination, CloudDestination) else None 532 ), 533 api_root=self.api_root, 534 client_id=self.client_id, 535 client_secret=self.client_secret, 536 bearer_token=self.bearer_token, 537 safe_mode=safe_mode, 538 ) 539 540 # Deploy and delete connections 541 542 def deploy_connection( 543 self, 544 connection_name: str, 545 *, 546 source: CloudSource | str, 547 selected_streams: list[str], 548 destination: CloudDestination | str, 549 table_prefix: str | None = None, 550 ) -> CloudConnection: 551 """Create a new connection between an already deployed source and destination. 552 553 Returns the newly deployed connection object. 554 555 Args: 556 connection_name: The name of the connection. 557 source: The deployed source. You can pass a source ID or a CloudSource object. 558 destination: The deployed destination. You can pass a destination ID or a 559 CloudDestination object. 560 table_prefix: Optional. The table prefix to use when syncing to the destination. 561 selected_streams: The selected stream names to sync within the connection. 562 """ 563 if not selected_streams: 564 raise exc.PyAirbyteInputError( 565 guidance="You must provide `selected_streams` when creating a connection." 566 ) 567 568 source_id: str = source if isinstance(source, str) else source.connector_id 569 destination_id: str = ( 570 destination if isinstance(destination, str) else destination.connector_id 571 ) 572 573 deployed_connection = api_util.create_connection( 574 name=connection_name, 575 source_id=source_id, 576 destination_id=destination_id, 577 api_root=self.api_root, 578 workspace_id=self.workspace_id, 579 selected_stream_names=selected_streams, 580 prefix=table_prefix or "", 581 client_id=self.client_id, 582 client_secret=self.client_secret, 583 bearer_token=self.bearer_token, 584 ) 585 586 return CloudConnection( 587 workspace=self, 588 connection_id=deployed_connection.connection_id, 589 source=deployed_connection.source_id, 590 destination=deployed_connection.destination_id, 591 ) 592 593 def permanently_delete_connection( 594 self, 595 connection: str | CloudConnection, 596 *, 597 cascade_delete_source: bool = False, 598 cascade_delete_destination: bool = False, 599 safe_mode: bool = True, 600 ) -> None: 601 """Delete a deployed connection from the workspace. 602 603 Args: 604 connection: The connection ID or CloudConnection object to delete 605 cascade_delete_source: If True, also delete the source after deleting the connection 606 cascade_delete_destination: If True, also delete the destination after deleting 607 the connection 608 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 609 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 610 to cascade deletes. 611 """ 612 if connection is None: 613 raise ValueError("No connection ID provided.") 614 615 if isinstance(connection, str): 616 connection = CloudConnection( 617 workspace=self, 618 connection_id=connection, 619 ) 620 621 api_util.delete_connection( 622 connection_id=connection.connection_id, 623 connection_name=connection.name, 624 api_root=self.api_root, 625 workspace_id=self.workspace_id, 626 client_id=self.client_id, 627 client_secret=self.client_secret, 628 bearer_token=self.bearer_token, 629 safe_mode=safe_mode, 630 ) 631 632 if cascade_delete_source: 633 self.permanently_delete_source( 634 source=connection.source_id, 635 safe_mode=safe_mode, 636 ) 637 if cascade_delete_destination: 638 self.permanently_delete_destination( 639 destination=connection.destination_id, 640 safe_mode=safe_mode, 641 ) 642 643 # List workspaces, sources, destinations, and connections 644 645 def list_workspaces( 646 self, 647 name: str | None = None, 648 *, 649 name_filter: Callable | None = None, 650 limit: int | None = None, 651 ) -> list[CloudWorkspaceInfo]: 652 """List workspaces available to the current credentials, with an optional limit.""" 653 return [ 654 CloudWorkspaceInfo.from_api_response(workspace) 655 for workspace in api_util.list_workspaces( 656 workspace_id="", 657 api_root=self.api_root, 658 name=name, 659 name_filter=name_filter, 660 client_id=self.client_id, 661 client_secret=self.client_secret, 662 bearer_token=self.bearer_token, 663 limit=limit, 664 ) 665 ] 666 667 def rename( 668 self, 669 name: str, 670 ) -> CloudWorkspace: 671 """Rename this workspace.""" 672 api_util.rename_workspace( 673 workspace_id=self.workspace_id, 674 name=name, 675 api_root=self.api_root, 676 client_id=self.client_id, 677 client_secret=self.client_secret, 678 bearer_token=self.bearer_token, 679 ) 680 return self 681 682 def permanently_delete( 683 self, 684 *, 685 workspace_name: str | None = None, 686 safe_mode: bool = True, 687 ) -> None: 688 """Permanently delete this workspace if it has no connections. 689 690 When `safe_mode` is enabled, the workspace name must contain `delete-me` 691 or `deleteme`. This also checks for existing connections before deleting 692 and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty. 693 """ 694 api_util.permanently_delete_workspace( 695 workspace_id=self.workspace_id, 696 workspace_name=workspace_name, 697 api_root=self.api_root, 698 client_id=self.client_id, 699 client_secret=self.client_secret, 700 bearer_token=self.bearer_token, 701 safe_mode=safe_mode, 702 ) 703 704 def list_connections( 705 self, 706 name: str | None = None, 707 *, 708 name_filter: Callable | None = None, 709 limit: int | None = None, 710 ) -> list[CloudConnection]: 711 """List connections by name in the workspace, with an optional limit.""" 712 connections = api_util.list_connections( 713 api_root=self.api_root, 714 workspace_id=self.workspace_id, 715 name=name, 716 name_filter=name_filter, 717 limit=limit, 718 client_id=self.client_id, 719 client_secret=self.client_secret, 720 bearer_token=self.bearer_token, 721 ) 722 return [ 723 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 724 workspace=self, 725 connection_response=connection, 726 ) 727 for connection in connections 728 ] 729 730 def list_sources( 731 self, 732 name: str | None = None, 733 *, 734 name_filter: Callable | None = None, 735 limit: int | None = None, 736 ) -> list[CloudSource]: 737 """List all sources in the workspace, with an optional limit.""" 738 sources = api_util.list_sources( 739 api_root=self.api_root, 740 workspace_id=self.workspace_id, 741 name=name, 742 name_filter=name_filter, 743 limit=limit, 744 client_id=self.client_id, 745 client_secret=self.client_secret, 746 bearer_token=self.bearer_token, 747 ) 748 return [ 749 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 750 workspace=self, 751 source_response=source, 752 ) 753 for source in sources 754 ] 755 756 def list_destinations( 757 self, 758 name: str | None = None, 759 *, 760 name_filter: Callable | None = None, 761 limit: int | None = None, 762 ) -> list[CloudDestination]: 763 """List all destinations in the workspace, with an optional limit.""" 764 destinations = api_util.list_destinations( 765 api_root=self.api_root, 766 workspace_id=self.workspace_id, 767 name=name, 768 name_filter=name_filter, 769 limit=limit, 770 client_id=self.client_id, 771 client_secret=self.client_secret, 772 bearer_token=self.bearer_token, 773 ) 774 return [ 775 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 776 workspace=self, 777 destination_response=destination, 778 ) 779 for destination in destinations 780 ] 781 782 def publish_custom_source_definition( 783 self, 784 name: str, 785 *, 786 manifest_yaml: dict[str, Any] | Path | str | None = None, 787 docker_image: str | None = None, 788 docker_tag: str | None = None, 789 unique: bool = True, 790 pre_validate: bool = True, 791 testing_values: dict[str, Any] | None = None, 792 ) -> CustomCloudSourceDefinition: 793 """Publish a custom source connector definition. 794 795 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 796 and docker_tag (for Docker connectors), but not both. 797 798 Args: 799 name: Display name for the connector definition 800 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 801 docker_image: Docker repository (e.g., 'airbyte/source-custom') 802 docker_tag: Docker image tag (e.g., '1.0.0') 803 unique: Whether to enforce name uniqueness 804 pre_validate: Whether to validate manifest client-side (YAML only) 805 testing_values: Optional configuration values to use for testing in the 806 Connector Builder UI. If provided, these values are stored as the complete 807 testing values object for the connector builder project (replaces any existing 808 values), allowing immediate test read operations. 809 810 Returns: 811 CustomCloudSourceDefinition object representing the created definition 812 813 Raises: 814 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 815 AirbyteDuplicateResourcesError: If unique=True and name already exists 816 """ 817 is_yaml = manifest_yaml is not None 818 is_docker = docker_image is not None 819 820 if is_yaml == is_docker: 821 raise exc.PyAirbyteInputError( 822 message=( 823 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 824 "docker_image + docker_tag (for Docker connectors), but not both" 825 ), 826 context={ 827 "manifest_yaml_provided": is_yaml, 828 "docker_image_provided": is_docker, 829 }, 830 ) 831 832 if is_docker and docker_tag is None: 833 raise exc.PyAirbyteInputError( 834 message="docker_tag is required when docker_image is specified", 835 context={"docker_image": docker_image}, 836 ) 837 838 if unique: 839 existing = self.list_custom_source_definitions( 840 definition_type="yaml" if is_yaml else "docker", 841 ) 842 if any(d.name == name for d in existing): 843 raise exc.AirbyteDuplicateResourcesError( 844 resource_type="custom_source_definition", 845 resource_name=name, 846 ) 847 848 if is_yaml: 849 manifest_dict: dict[str, Any] 850 if isinstance(manifest_yaml, Path): 851 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 852 elif isinstance(manifest_yaml, str): 853 manifest_dict = yaml.safe_load(manifest_yaml) 854 elif manifest_yaml is not None: 855 manifest_dict = manifest_yaml 856 else: 857 raise exc.PyAirbyteInputError( 858 message="manifest_yaml is required for YAML connectors", 859 context={"name": name}, 860 ) 861 862 if pre_validate: 863 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 864 865 result = api_util.create_custom_yaml_source_definition( 866 name=name, 867 workspace_id=self.workspace_id, 868 manifest=manifest_dict, 869 api_root=self.api_root, 870 client_id=self.client_id, 871 client_secret=self.client_secret, 872 bearer_token=self.bearer_token, 873 ) 874 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 875 self, result 876 ) 877 878 # Set testing values if provided 879 if testing_values is not None: 880 custom_definition.set_testing_values(testing_values) 881 882 return custom_definition 883 884 raise NotImplementedError( 885 "Docker custom source definitions are not yet supported. " 886 "Only YAML manifest-based custom sources are currently available." 887 ) 888 889 def list_custom_source_definitions( 890 self, 891 *, 892 definition_type: Literal["yaml", "docker"], 893 ) -> list[CustomCloudSourceDefinition]: 894 """List custom source connector definitions. 895 896 Args: 897 definition_type: Connector type to list ("yaml" or "docker"). Required. 898 899 Returns: 900 List of CustomCloudSourceDefinition objects matching the specified type 901 """ 902 if definition_type == "yaml": 903 yaml_definitions = api_util.list_custom_yaml_source_definitions( 904 workspace_id=self.workspace_id, 905 api_root=self.api_root, 906 client_id=self.client_id, 907 client_secret=self.client_secret, 908 bearer_token=self.bearer_token, 909 ) 910 return [ 911 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 912 for d in yaml_definitions 913 ] 914 915 raise NotImplementedError( 916 "Docker custom source definitions are not yet supported. " 917 "Only YAML manifest-based custom sources are currently available." 918 ) 919 920 def get_custom_source_definition( 921 self, 922 definition_id: str, 923 *, 924 definition_type: Literal["yaml", "docker"], 925 ) -> CustomCloudSourceDefinition: 926 """Get a specific custom source definition by ID. 927 928 Args: 929 definition_id: The definition ID 930 definition_type: Connector type ("yaml" or "docker"). Required. 931 932 Returns: 933 CustomCloudSourceDefinition object 934 """ 935 if definition_type == "yaml": 936 result = api_util.get_custom_yaml_source_definition( 937 workspace_id=self.workspace_id, 938 definition_id=definition_id, 939 api_root=self.api_root, 940 client_id=self.client_id, 941 client_secret=self.client_secret, 942 bearer_token=self.bearer_token, 943 ) 944 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 945 946 raise NotImplementedError( 947 "Docker custom source definitions are not yet supported. " 948 "Only YAML manifest-based custom sources are currently available." 949 )
A remote workspace on the Airbyte Cloud.
By overriding api_root, you can use this class to interact with self-managed Airbyte
instances, both OSS and Enterprise.
Two authentication methods are supported (mutually exclusive):
- OAuth2 client credentials (client_id + client_secret)
- Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace( workspace_id="...", client_id="...", client_secret="...", )
Example with bearer token:
workspace = CloudWorkspace( workspace_id="...", bearer_token="...", )
111 def __init__( 112 self, 113 *, 114 workspace_id: str | None = None, 115 client_id: str | SecretString | None = None, 116 client_secret: str | SecretString | None = None, 117 api_root: str | None = None, 118 config_api_root: str | None = None, 119 bearer_token: str | SecretString | None = None, 120 ) -> None: 121 """Validate and initialize credentials.""" 122 env_vars = not (client_id or client_secret or bearer_token) 123 credentials = _AirbyteCredentials.from_auth( 124 workspace_id=workspace_id, 125 client_id=client_id, 126 client_secret=client_secret, 127 bearer_token=bearer_token, 128 public_api_root=api_root, 129 config_api_root=config_api_root, 130 env_vars=env_vars, 131 ) 132 if not credentials.workspace_id: 133 raise exc.PyAirbyteInputError( 134 message="Workspace ID is required.", 135 guidance="Provide a workspace ID.", 136 ) 137 138 self._credentials = credentials 139 self.workspace_id = credentials.workspace_id or "" 140 self.client_id = credentials.client_id 141 self.client_secret = credentials.client_secret 142 self.bearer_token = credentials.bearer_token 143 self.api_root = credentials.public_api_root 144 self.config_api_root = credentials.config_api_root 145 146 # Create internal CloudClientConfig object (validates mutual exclusivity) 147 self._client_config = CloudClientConfig( 148 client_id=self.client_id, 149 client_secret=self.client_secret, 150 bearer_token=self.bearer_token, 151 api_root=self.api_root, 152 config_api_root=self.config_api_root, 153 )
Validate and initialize credentials.
155 @classmethod 156 def from_env( 157 cls, 158 workspace_id: str | None = None, 159 *, 160 api_root: str | None = None, 161 config_api_root: str | None = None, 162 ) -> CloudWorkspace: 163 """Create a CloudWorkspace using credentials from environment variables. 164 165 This factory method resolves credentials from environment variables, 166 providing a convenient way to create a workspace without explicitly 167 passing credentials. 168 169 Two authentication methods are supported (mutually exclusive): 170 1. Bearer token (checked first) 171 2. OAuth2 client credentials (fallback) 172 173 Environment variables used: 174 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 175 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 176 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 177 - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument). 178 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 179 - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL. 180 181 Args: 182 workspace_id: The workspace ID. If not provided, will be resolved from 183 the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable. 184 api_root: The API root URL. If not provided, will be resolved from 185 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 186 the Airbyte Cloud API. 187 config_api_root: The Config API root URL. If not provided, will be resolved 188 from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable. 189 190 Returns: 191 A CloudWorkspace instance configured with credentials from the environment. 192 193 Raises: 194 PyAirbyteInputError: If required credentials are not found in 195 the environment or are incomplete. 196 197 Example: 198 ```python 199 # With workspace_id from environment 200 workspace = CloudWorkspace.from_env() 201 202 # With explicit workspace_id 203 workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id") 204 ``` 205 """ 206 return cls( 207 workspace_id=workspace_id, 208 api_root=api_root, 209 config_api_root=config_api_root, 210 )
Create a CloudWorkspace using credentials from environment variables.
This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.
Two authentication methods are supported (mutually exclusive):
- Bearer token (checked first)
- OAuth2 client credentials (fallback)
Environment variables used:
AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).AIRBYTE_CLOUD_CONFIG_API_URL: Optional. The Config API root URL.
Arguments:
- workspace_id: The workspace ID. If not provided, will be resolved from
the
AIRBYTE_CLOUD_WORKSPACE_IDenvironment variable. - api_root: The API root URL. If not provided, will be resolved from
the
AIRBYTE_CLOUD_API_URLenvironment variable, or default to the Airbyte Cloud API. - config_api_root: The Config API root URL. If not provided, will be resolved
from the
AIRBYTE_CLOUD_CONFIG_API_URLenvironment variable.
Returns:
A CloudWorkspace instance configured with credentials from the environment.
Raises:
- PyAirbyteInputError: If required credentials are not found in the environment or are incomplete.
Example:
# With workspace_id from environment workspace = CloudWorkspace.from_env() # With explicit workspace_id workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
212 @property 213 def workspace_url(self) -> str | None: 214 """The web URL of the workspace.""" 215 return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
The web URL of the workspace.
250 def get_organization( 251 self, 252 *, 253 raise_on_error: bool = True, 254 ) -> CloudOrganization | None: 255 """Get the organization this workspace belongs to. 256 257 Fetching organization info requires ORGANIZATION_READER permissions on the organization, 258 which may not be available with workspace-scoped credentials. 259 260 Args: 261 raise_on_error: If True (default), raises AirbyteError on permission or API errors. 262 If False, returns None instead of raising. 263 264 Returns: 265 CloudOrganization object with organization_id and organization_name, 266 or None if raise_on_error=False and an error occurred. 267 268 Raises: 269 AirbyteError: If raise_on_error=True and the organization info cannot be fetched 270 (e.g., due to insufficient permissions or missing data). 271 """ 272 try: 273 info = self._organization_info 274 except (AirbyteError, NotImplementedError): 275 if raise_on_error: 276 raise 277 return None 278 279 organization_id = info.get("organizationId") 280 organization_name = info.get("organizationName") 281 282 # Validate that both organization_id and organization_name are non-null and non-empty 283 if not organization_id or not organization_name: 284 if raise_on_error: 285 raise AirbyteError( 286 message="Organization info is incomplete.", 287 context={ 288 "organization_id": organization_id, 289 "organization_name": organization_name, 290 }, 291 ) 292 return None 293 294 organization_credentials = self._credentials.with_organization_id(organization_id) 295 return CloudOrganization( 296 organization_id=organization_id, 297 organization_name=organization_name, 298 client_id=organization_credentials.client_id, 299 client_secret=organization_credentials.client_secret, 300 bearer_token=organization_credentials.bearer_token, 301 public_api_root=organization_credentials.public_api_root, 302 config_api_root=organization_credentials.config_api_root, 303 )
Get the organization this workspace belongs to.
Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.
Arguments:
- raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:
CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.
Raises:
- AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
307 def connect(self) -> None: 308 """Check that the workspace is reachable and raise an exception otherwise. 309 310 Note: It is not necessary to call this method before calling other operations. It 311 serves primarily as a simple check to ensure that the workspace is reachable 312 and credentials are correct. 313 """ 314 _ = api_util.get_workspace( 315 api_root=self.api_root, 316 workspace_id=self.workspace_id, 317 client_id=self.client_id, 318 client_secret=self.client_secret, 319 bearer_token=self.bearer_token, 320 ) 321 print(f"Successfully connected to workspace: {self.workspace_url}")
Check that the workspace is reachable and raise an exception otherwise.
Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.
325 def get_connection( 326 self, 327 connection_id: str, 328 ) -> CloudConnection: 329 """Get a connection by ID. 330 331 This method does not fetch data from the API. It returns a `CloudConnection` object, 332 which will be loaded lazily as needed. 333 """ 334 return CloudConnection( 335 workspace=self, 336 connection_id=connection_id, 337 )
Get a connection by ID.
This method does not fetch data from the API. It returns a CloudConnection object,
which will be loaded lazily as needed.
339 def get_source( 340 self, 341 source_id: str, 342 ) -> CloudSource: 343 """Get a source by ID. 344 345 This method does not fetch data from the API. It returns a `CloudSource` object, 346 which will be loaded lazily as needed. 347 """ 348 return CloudSource( 349 workspace=self, 350 connector_id=source_id, 351 )
Get a source by ID.
This method does not fetch data from the API. It returns a CloudSource object,
which will be loaded lazily as needed.
353 def get_destination( 354 self, 355 destination_id: str, 356 ) -> CloudDestination: 357 """Get a destination by ID. 358 359 This method does not fetch data from the API. It returns a `CloudDestination` object, 360 which will be loaded lazily as needed. 361 """ 362 return CloudDestination( 363 workspace=self, 364 connector_id=destination_id, 365 )
Get a destination by ID.
This method does not fetch data from the API. It returns a CloudDestination object,
which will be loaded lazily as needed.
369 def deploy_source( 370 self, 371 name: str, 372 source: Source, 373 *, 374 unique: bool = True, 375 random_name_suffix: bool = False, 376 ) -> CloudSource: 377 """Deploy a source to the workspace. 378 379 Returns the newly deployed source. 380 381 Args: 382 name: The name to use when deploying. 383 source: The source object to deploy. 384 unique: Whether to require a unique name. If `True`, duplicate names 385 are not allowed. Defaults to `True`. 386 random_name_suffix: Whether to append a random suffix to the name. 387 """ 388 source_config_dict = source._hydrated_config.copy() # noqa: SLF001 (non-public API) 389 source_config_dict["sourceType"] = source.name.replace("source-", "") 390 391 if random_name_suffix: 392 name += f" (ID: {text_util.generate_random_suffix()})" 393 394 if unique: 395 existing = self.list_sources(name=name) 396 if existing: 397 raise exc.AirbyteDuplicateResourcesError( 398 resource_type="source", 399 resource_name=name, 400 ) 401 402 deployed_source = api_util.create_source( 403 name=name, 404 api_root=self.api_root, 405 workspace_id=self.workspace_id, 406 config=source_config_dict, 407 client_id=self.client_id, 408 client_secret=self.client_secret, 409 bearer_token=self.bearer_token, 410 ) 411 return CloudSource( 412 workspace=self, 413 connector_id=deployed_source.source_id, 414 )
Deploy a source to the workspace.
Returns the newly deployed source.
Arguments:
- name: The name to use when deploying.
- source: The source object to deploy.
- unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
416 def deploy_destination( 417 self, 418 name: str, 419 destination: Destination | dict[str, Any], 420 *, 421 unique: bool = True, 422 random_name_suffix: bool = False, 423 ) -> CloudDestination: 424 """Deploy a destination to the workspace. 425 426 Returns the newly deployed destination ID. 427 428 Args: 429 name: The name to use when deploying. 430 destination: The destination to deploy. Can be a local Airbyte `Destination` object or a 431 dictionary of configuration values. 432 unique: Whether to require a unique name. If `True`, duplicate names 433 are not allowed. Defaults to `True`. 434 random_name_suffix: Whether to append a random suffix to the name. 435 """ 436 if isinstance(destination, Destination): 437 destination_conf_dict = destination._hydrated_config.copy() # noqa: SLF001 (non-public API) 438 destination_conf_dict["destinationType"] = destination.name.replace("destination-", "") 439 # raise ValueError(destination_conf_dict) 440 else: 441 destination_conf_dict = destination.copy() 442 if "destinationType" not in destination_conf_dict: 443 raise exc.PyAirbyteInputError( 444 message="Missing `destinationType` in configuration dictionary.", 445 ) 446 447 if random_name_suffix: 448 name += f" (ID: {text_util.generate_random_suffix()})" 449 450 if unique: 451 existing = self.list_destinations(name=name) 452 if existing: 453 raise exc.AirbyteDuplicateResourcesError( 454 resource_type="destination", 455 resource_name=name, 456 ) 457 458 deployed_destination = api_util.create_destination( 459 name=name, 460 api_root=self.api_root, 461 workspace_id=self.workspace_id, 462 config=destination_conf_dict, # Wants a dataclass but accepts dict 463 client_id=self.client_id, 464 client_secret=self.client_secret, 465 bearer_token=self.bearer_token, 466 ) 467 return CloudDestination( 468 workspace=self, 469 connector_id=deployed_destination.destination_id, 470 )
Deploy a destination to the workspace.
Returns the newly deployed destination ID.
Arguments:
- name: The name to use when deploying.
- destination: The destination to deploy. Can be a local Airbyte
Destinationobject or a dictionary of configuration values. - unique: Whether to require a unique name. If
True, duplicate names are not allowed. Defaults toTrue. - random_name_suffix: Whether to append a random suffix to the name.
472 def permanently_delete_source( 473 self, 474 source: str | CloudSource, 475 *, 476 safe_mode: bool = True, 477 ) -> None: 478 """Delete a source from the workspace. 479 480 You can pass either the source ID `str` or a deployed `Source` object. 481 482 Args: 483 source: The source ID or CloudSource object to delete 484 safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" 485 (case insensitive) to prevent accidental deletion. Defaults to True. 486 """ 487 if not isinstance(source, (str, CloudSource)): 488 raise exc.PyAirbyteInputError( 489 message="Invalid source type.", 490 input_value=type(source).__name__, 491 ) 492 493 api_util.delete_source( 494 source_id=source.connector_id if isinstance(source, CloudSource) else source, 495 source_name=source.name if isinstance(source, CloudSource) else None, 496 api_root=self.api_root, 497 client_id=self.client_id, 498 client_secret=self.client_secret, 499 bearer_token=self.bearer_token, 500 safe_mode=safe_mode, 501 )
Delete a source from the workspace.
You can pass either the source ID str or a deployed Source object.
Arguments:
- source: The source ID or CloudSource object to delete
- safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
505 def permanently_delete_destination( 506 self, 507 destination: str | CloudDestination, 508 *, 509 safe_mode: bool = True, 510 ) -> None: 511 """Delete a deployed destination from the workspace. 512 513 You can pass either the `Cache` class or the deployed destination ID as a `str`. 514 515 Args: 516 destination: The destination ID or CloudDestination object to delete 517 safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" 518 (case insensitive) to prevent accidental deletion. Defaults to True. 519 """ 520 if not isinstance(destination, (str, CloudDestination)): 521 raise exc.PyAirbyteInputError( 522 message="Invalid destination type.", 523 input_value=type(destination).__name__, 524 ) 525 526 api_util.delete_destination( 527 destination_id=( 528 destination if isinstance(destination, str) else destination.destination_id 529 ), 530 destination_name=( 531 destination.name if isinstance(destination, CloudDestination) else None 532 ), 533 api_root=self.api_root, 534 client_id=self.client_id, 535 client_secret=self.client_secret, 536 bearer_token=self.bearer_token, 537 safe_mode=safe_mode, 538 )
Delete a deployed destination from the workspace.
You can pass either the Cache class or the deployed destination ID as a str.
Arguments:
- destination: The destination ID or CloudDestination object to delete
- safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
542 def deploy_connection( 543 self, 544 connection_name: str, 545 *, 546 source: CloudSource | str, 547 selected_streams: list[str], 548 destination: CloudDestination | str, 549 table_prefix: str | None = None, 550 ) -> CloudConnection: 551 """Create a new connection between an already deployed source and destination. 552 553 Returns the newly deployed connection object. 554 555 Args: 556 connection_name: The name of the connection. 557 source: The deployed source. You can pass a source ID or a CloudSource object. 558 destination: The deployed destination. You can pass a destination ID or a 559 CloudDestination object. 560 table_prefix: Optional. The table prefix to use when syncing to the destination. 561 selected_streams: The selected stream names to sync within the connection. 562 """ 563 if not selected_streams: 564 raise exc.PyAirbyteInputError( 565 guidance="You must provide `selected_streams` when creating a connection." 566 ) 567 568 source_id: str = source if isinstance(source, str) else source.connector_id 569 destination_id: str = ( 570 destination if isinstance(destination, str) else destination.connector_id 571 ) 572 573 deployed_connection = api_util.create_connection( 574 name=connection_name, 575 source_id=source_id, 576 destination_id=destination_id, 577 api_root=self.api_root, 578 workspace_id=self.workspace_id, 579 selected_stream_names=selected_streams, 580 prefix=table_prefix or "", 581 client_id=self.client_id, 582 client_secret=self.client_secret, 583 bearer_token=self.bearer_token, 584 ) 585 586 return CloudConnection( 587 workspace=self, 588 connection_id=deployed_connection.connection_id, 589 source=deployed_connection.source_id, 590 destination=deployed_connection.destination_id, 591 )
Create a new connection between an already deployed source and destination.
Returns the newly deployed connection object.
Arguments:
- connection_name: The name of the connection.
- source: The deployed source. You can pass a source ID or a CloudSource object.
- destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
- table_prefix: Optional. The table prefix to use when syncing to the destination.
- selected_streams: The selected stream names to sync within the connection.
593 def permanently_delete_connection( 594 self, 595 connection: str | CloudConnection, 596 *, 597 cascade_delete_source: bool = False, 598 cascade_delete_destination: bool = False, 599 safe_mode: bool = True, 600 ) -> None: 601 """Delete a deployed connection from the workspace. 602 603 Args: 604 connection: The connection ID or CloudConnection object to delete 605 cascade_delete_source: If True, also delete the source after deleting the connection 606 cascade_delete_destination: If True, also delete the destination after deleting 607 the connection 608 safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" 609 (case insensitive) to prevent accidental deletion. Defaults to True. Also applies 610 to cascade deletes. 611 """ 612 if connection is None: 613 raise ValueError("No connection ID provided.") 614 615 if isinstance(connection, str): 616 connection = CloudConnection( 617 workspace=self, 618 connection_id=connection, 619 ) 620 621 api_util.delete_connection( 622 connection_id=connection.connection_id, 623 connection_name=connection.name, 624 api_root=self.api_root, 625 workspace_id=self.workspace_id, 626 client_id=self.client_id, 627 client_secret=self.client_secret, 628 bearer_token=self.bearer_token, 629 safe_mode=safe_mode, 630 ) 631 632 if cascade_delete_source: 633 self.permanently_delete_source( 634 source=connection.source_id, 635 safe_mode=safe_mode, 636 ) 637 if cascade_delete_destination: 638 self.permanently_delete_destination( 639 destination=connection.destination_id, 640 safe_mode=safe_mode, 641 )
Delete a deployed connection from the workspace.
Arguments:
- connection: The connection ID or CloudConnection object to delete
- cascade_delete_source: If True, also delete the source after deleting the connection
- cascade_delete_destination: If True, also delete the destination after deleting the connection
- safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
645 def list_workspaces( 646 self, 647 name: str | None = None, 648 *, 649 name_filter: Callable | None = None, 650 limit: int | None = None, 651 ) -> list[CloudWorkspaceInfo]: 652 """List workspaces available to the current credentials, with an optional limit.""" 653 return [ 654 CloudWorkspaceInfo.from_api_response(workspace) 655 for workspace in api_util.list_workspaces( 656 workspace_id="", 657 api_root=self.api_root, 658 name=name, 659 name_filter=name_filter, 660 client_id=self.client_id, 661 client_secret=self.client_secret, 662 bearer_token=self.bearer_token, 663 limit=limit, 664 ) 665 ]
List workspaces available to the current credentials, with an optional limit.
667 def rename( 668 self, 669 name: str, 670 ) -> CloudWorkspace: 671 """Rename this workspace.""" 672 api_util.rename_workspace( 673 workspace_id=self.workspace_id, 674 name=name, 675 api_root=self.api_root, 676 client_id=self.client_id, 677 client_secret=self.client_secret, 678 bearer_token=self.bearer_token, 679 ) 680 return self
Rename this workspace.
682 def permanently_delete( 683 self, 684 *, 685 workspace_name: str | None = None, 686 safe_mode: bool = True, 687 ) -> None: 688 """Permanently delete this workspace if it has no connections. 689 690 When `safe_mode` is enabled, the workspace name must contain `delete-me` 691 or `deleteme`. This also checks for existing connections before deleting 692 and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty. 693 """ 694 api_util.permanently_delete_workspace( 695 workspace_id=self.workspace_id, 696 workspace_name=workspace_name, 697 api_root=self.api_root, 698 client_id=self.client_id, 699 client_secret=self.client_secret, 700 bearer_token=self.bearer_token, 701 safe_mode=safe_mode, 702 )
Permanently delete this workspace if it has no connections.
When safe_mode is enabled, the workspace name must contain delete-me
or deleteme. This also checks for existing connections before deleting
and raises AirbyteWorkspaceNotEmptyError if the workspace is not empty.
704 def list_connections( 705 self, 706 name: str | None = None, 707 *, 708 name_filter: Callable | None = None, 709 limit: int | None = None, 710 ) -> list[CloudConnection]: 711 """List connections by name in the workspace, with an optional limit.""" 712 connections = api_util.list_connections( 713 api_root=self.api_root, 714 workspace_id=self.workspace_id, 715 name=name, 716 name_filter=name_filter, 717 limit=limit, 718 client_id=self.client_id, 719 client_secret=self.client_secret, 720 bearer_token=self.bearer_token, 721 ) 722 return [ 723 CloudConnection._from_connection_response( # noqa: SLF001 (non-public API) 724 workspace=self, 725 connection_response=connection, 726 ) 727 for connection in connections 728 ]
List connections by name in the workspace, with an optional limit.
730 def list_sources( 731 self, 732 name: str | None = None, 733 *, 734 name_filter: Callable | None = None, 735 limit: int | None = None, 736 ) -> list[CloudSource]: 737 """List all sources in the workspace, with an optional limit.""" 738 sources = api_util.list_sources( 739 api_root=self.api_root, 740 workspace_id=self.workspace_id, 741 name=name, 742 name_filter=name_filter, 743 limit=limit, 744 client_id=self.client_id, 745 client_secret=self.client_secret, 746 bearer_token=self.bearer_token, 747 ) 748 return [ 749 CloudSource._from_source_response( # noqa: SLF001 (non-public API) 750 workspace=self, 751 source_response=source, 752 ) 753 for source in sources 754 ]
List all sources in the workspace, with an optional limit.
756 def list_destinations( 757 self, 758 name: str | None = None, 759 *, 760 name_filter: Callable | None = None, 761 limit: int | None = None, 762 ) -> list[CloudDestination]: 763 """List all destinations in the workspace, with an optional limit.""" 764 destinations = api_util.list_destinations( 765 api_root=self.api_root, 766 workspace_id=self.workspace_id, 767 name=name, 768 name_filter=name_filter, 769 limit=limit, 770 client_id=self.client_id, 771 client_secret=self.client_secret, 772 bearer_token=self.bearer_token, 773 ) 774 return [ 775 CloudDestination._from_destination_response( # noqa: SLF001 (non-public API) 776 workspace=self, 777 destination_response=destination, 778 ) 779 for destination in destinations 780 ]
List all destinations in the workspace, with an optional limit.
782 def publish_custom_source_definition( 783 self, 784 name: str, 785 *, 786 manifest_yaml: dict[str, Any] | Path | str | None = None, 787 docker_image: str | None = None, 788 docker_tag: str | None = None, 789 unique: bool = True, 790 pre_validate: bool = True, 791 testing_values: dict[str, Any] | None = None, 792 ) -> CustomCloudSourceDefinition: 793 """Publish a custom source connector definition. 794 795 You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image 796 and docker_tag (for Docker connectors), but not both. 797 798 Args: 799 name: Display name for the connector definition 800 manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string) 801 docker_image: Docker repository (e.g., 'airbyte/source-custom') 802 docker_tag: Docker image tag (e.g., '1.0.0') 803 unique: Whether to enforce name uniqueness 804 pre_validate: Whether to validate manifest client-side (YAML only) 805 testing_values: Optional configuration values to use for testing in the 806 Connector Builder UI. If provided, these values are stored as the complete 807 testing values object for the connector builder project (replaces any existing 808 values), allowing immediate test read operations. 809 810 Returns: 811 CustomCloudSourceDefinition object representing the created definition 812 813 Raises: 814 PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided 815 AirbyteDuplicateResourcesError: If unique=True and name already exists 816 """ 817 is_yaml = manifest_yaml is not None 818 is_docker = docker_image is not None 819 820 if is_yaml == is_docker: 821 raise exc.PyAirbyteInputError( 822 message=( 823 "Must specify EITHER manifest_yaml (for YAML connectors) OR " 824 "docker_image + docker_tag (for Docker connectors), but not both" 825 ), 826 context={ 827 "manifest_yaml_provided": is_yaml, 828 "docker_image_provided": is_docker, 829 }, 830 ) 831 832 if is_docker and docker_tag is None: 833 raise exc.PyAirbyteInputError( 834 message="docker_tag is required when docker_image is specified", 835 context={"docker_image": docker_image}, 836 ) 837 838 if unique: 839 existing = self.list_custom_source_definitions( 840 definition_type="yaml" if is_yaml else "docker", 841 ) 842 if any(d.name == name for d in existing): 843 raise exc.AirbyteDuplicateResourcesError( 844 resource_type="custom_source_definition", 845 resource_name=name, 846 ) 847 848 if is_yaml: 849 manifest_dict: dict[str, Any] 850 if isinstance(manifest_yaml, Path): 851 manifest_dict = yaml.safe_load(manifest_yaml.read_text()) 852 elif isinstance(manifest_yaml, str): 853 manifest_dict = yaml.safe_load(manifest_yaml) 854 elif manifest_yaml is not None: 855 manifest_dict = manifest_yaml 856 else: 857 raise exc.PyAirbyteInputError( 858 message="manifest_yaml is required for YAML connectors", 859 context={"name": name}, 860 ) 861 862 if pre_validate: 863 api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True) 864 865 result = api_util.create_custom_yaml_source_definition( 866 name=name, 867 workspace_id=self.workspace_id, 868 manifest=manifest_dict, 869 api_root=self.api_root, 870 client_id=self.client_id, 871 client_secret=self.client_secret, 872 bearer_token=self.bearer_token, 873 ) 874 custom_definition = CustomCloudSourceDefinition._from_yaml_response( # noqa: SLF001 875 self, result 876 ) 877 878 # Set testing values if provided 879 if testing_values is not None: 880 custom_definition.set_testing_values(testing_values) 881 882 return custom_definition 883 884 raise NotImplementedError( 885 "Docker custom source definitions are not yet supported. " 886 "Only YAML manifest-based custom sources are currently available." 887 )
Publish a custom source connector definition.
You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.
Arguments:
- name: Display name for the connector definition
- manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
- docker_image: Docker repository (e.g., 'airbyte/source-custom')
- docker_tag: Docker image tag (e.g., '1.0.0')
- unique: Whether to enforce name uniqueness
- pre_validate: Whether to validate manifest client-side (YAML only)
- testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:
CustomCloudSourceDefinition object representing the created definition
Raises:
- PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
- AirbyteDuplicateResourcesError: If unique=True and name already exists
889 def list_custom_source_definitions( 890 self, 891 *, 892 definition_type: Literal["yaml", "docker"], 893 ) -> list[CustomCloudSourceDefinition]: 894 """List custom source connector definitions. 895 896 Args: 897 definition_type: Connector type to list ("yaml" or "docker"). Required. 898 899 Returns: 900 List of CustomCloudSourceDefinition objects matching the specified type 901 """ 902 if definition_type == "yaml": 903 yaml_definitions = api_util.list_custom_yaml_source_definitions( 904 workspace_id=self.workspace_id, 905 api_root=self.api_root, 906 client_id=self.client_id, 907 client_secret=self.client_secret, 908 bearer_token=self.bearer_token, 909 ) 910 return [ 911 CustomCloudSourceDefinition._from_yaml_response(self, d) # noqa: SLF001 912 for d in yaml_definitions 913 ] 914 915 raise NotImplementedError( 916 "Docker custom source definitions are not yet supported. " 917 "Only YAML manifest-based custom sources are currently available." 918 )
List custom source connector definitions.
Arguments:
- definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:
List of CustomCloudSourceDefinition objects matching the specified type
920 def get_custom_source_definition( 921 self, 922 definition_id: str, 923 *, 924 definition_type: Literal["yaml", "docker"], 925 ) -> CustomCloudSourceDefinition: 926 """Get a specific custom source definition by ID. 927 928 Args: 929 definition_id: The definition ID 930 definition_type: Connector type ("yaml" or "docker"). Required. 931 932 Returns: 933 CustomCloudSourceDefinition object 934 """ 935 if definition_type == "yaml": 936 result = api_util.get_custom_yaml_source_definition( 937 workspace_id=self.workspace_id, 938 definition_id=definition_id, 939 api_root=self.api_root, 940 client_id=self.client_id, 941 client_secret=self.client_secret, 942 bearer_token=self.bearer_token, 943 ) 944 return CustomCloudSourceDefinition._from_yaml_response(self, result) # noqa: SLF001 945 946 raise NotImplementedError( 947 "Docker custom source definitions are not yet supported. " 948 "Only YAML manifest-based custom sources are currently available." 949 )
Get a specific custom source definition by ID.
Arguments:
- definition_id: The definition ID
- definition_type: Connector type ("yaml" or "docker"). Required.
Returns:
CustomCloudSourceDefinition object
44class CloudConnection: # noqa: PLR0904 # Too many public methods 45 """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud. 46 47 You can use a connection object to run sync jobs, retrieve logs, and manage the connection. 48 """ 49 50 def __init__( 51 self, 52 workspace: CloudWorkspace, 53 connection_id: str, 54 source: str | None = None, 55 destination: str | None = None, 56 ) -> None: 57 """It is not recommended to create a `CloudConnection` object directly. 58 59 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 60 """ 61 self.connection_id = connection_id 62 """The ID of the connection.""" 63 64 self.workspace = workspace 65 """The workspace that the connection belongs to.""" 66 67 self._source_id = source 68 """The ID of the source.""" 69 70 self._destination_id = destination 71 """The ID of the destination.""" 72 73 self._connection_info: CloudConnectionInfo | None = None 74 """The connection info object. (Cached.)""" 75 76 self._cloud_source_object: CloudSource | None = None 77 """The source object. (Cached.)""" 78 79 self._cloud_destination_object: CloudDestination | None = None 80 """The destination object. (Cached.)""" 81 82 def _fetch_connection_info( 83 self, 84 *, 85 force_refresh: bool = False, 86 verify: bool = True, 87 ) -> CloudConnectionInfo: 88 """Fetch and cache connection info from the API. 89 90 By default, this method will only fetch from the API if connection info is not 91 already cached. It also verifies that the connection belongs to the expected 92 workspace unless verification is explicitly disabled. 93 94 Args: 95 force_refresh: If True, always fetch from the API even if cached. 96 If False (default), only fetch if not already cached. 97 verify: If True (default), verify that the connection is valid (e.g., that 98 the workspace_id matches this object's workspace). Raises an error if 99 validation fails. 100 101 Returns: 102 Information about the connection from the API. 103 104 Raises: 105 AirbyteWorkspaceMismatchError: If verify is True and the connection's 106 workspace_id doesn't match the expected workspace. 107 AirbyteMissingResourceError: If the connection doesn't exist. 108 """ 109 if not force_refresh and self._connection_info is not None: 110 # Use cached info, but still verify if requested 111 if verify: 112 self._verify_workspace_match(self._connection_info) 113 return self._connection_info 114 115 # Fetch from API 116 connection_info = api_util.get_connection( 117 workspace_id=self.workspace.workspace_id, 118 connection_id=self.connection_id, 119 api_root=self.workspace.api_root, 120 client_id=self.workspace.client_id, 121 client_secret=self.workspace.client_secret, 122 bearer_token=self.workspace.bearer_token, 123 ) 124 result = CloudConnectionInfo.from_api_response(connection_info) 125 126 self._connection_info = result 127 128 # Verify if requested 129 if verify: 130 self._verify_workspace_match(result) 131 132 return result 133 134 def _verify_workspace_match(self, connection_info: CloudConnectionInfo) -> None: 135 """Verify that the connection belongs to the expected workspace. 136 137 Raises: 138 AirbyteWorkspaceMismatchError: If the workspace IDs don't match. 139 """ 140 if connection_info.workspace_id != self.workspace.workspace_id: 141 raise AirbyteWorkspaceMismatchError( 142 resource_type="connection", 143 resource_id=self.connection_id, 144 workspace=self.workspace, 145 expected_workspace_id=self.workspace.workspace_id, 146 actual_workspace_id=connection_info.workspace_id, 147 message=( 148 f"Connection '{self.connection_id}' belongs to workspace " 149 f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'." 150 ), 151 ) 152 153 def check_is_valid(self) -> bool: 154 """Check if this connection exists and belongs to the expected workspace. 155 156 This method fetches connection info from the API (if not already cached) and 157 verifies that the connection's workspace_id matches the workspace associated 158 with this CloudConnection object. 159 160 Returns: 161 True if the connection exists and belongs to the expected workspace. 162 163 Raises: 164 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 165 AirbyteMissingResourceError: If the connection doesn't exist. 166 """ 167 self._fetch_connection_info(force_refresh=False, verify=True) 168 return True 169 170 @classmethod 171 def _from_connection_response( 172 cls, 173 workspace: CloudWorkspace, 174 connection_response: _ConnectionResponseLike, 175 ) -> CloudConnection: 176 """Create a CloudConnection from an API connection response.""" 177 connection_info = CloudConnectionInfo.from_api_response(connection_response) 178 result = cls( 179 workspace=workspace, 180 connection_id=connection_info.connection_id, 181 source=connection_info.source_id, 182 destination=connection_info.destination_id, 183 ) 184 result._connection_info = connection_info # noqa: SLF001 # Accessing Non-Public API 185 return result 186 187 # Properties 188 189 @property 190 def name(self) -> str | None: 191 """Get the display name of the connection, if available. 192 193 E.g. "My Postgres to Snowflake", not the connection ID. 194 """ 195 if not self._connection_info: 196 self._connection_info = self._fetch_connection_info() 197 198 return self._connection_info.name 199 200 @property 201 def source_id(self) -> str: 202 """The ID of the source.""" 203 if not self._source_id: 204 if not self._connection_info: 205 self._connection_info = self._fetch_connection_info() 206 207 self._source_id = self._connection_info.source_id 208 209 return self._source_id 210 211 @property 212 def source(self) -> CloudSource: 213 """Get the source object.""" 214 if self._cloud_source_object: 215 return self._cloud_source_object 216 217 self._cloud_source_object = CloudSource( 218 workspace=self.workspace, 219 connector_id=self.source_id, 220 ) 221 return self._cloud_source_object 222 223 @property 224 def destination_id(self) -> str: 225 """The ID of the destination.""" 226 if not self._destination_id: 227 if not self._connection_info: 228 self._connection_info = self._fetch_connection_info() 229 230 self._destination_id = self._connection_info.destination_id 231 232 return self._destination_id 233 234 @property 235 def destination(self) -> CloudDestination: 236 """Get the destination object.""" 237 if self._cloud_destination_object: 238 return self._cloud_destination_object 239 240 self._cloud_destination_object = CloudDestination( 241 workspace=self.workspace, 242 connector_id=self.destination_id, 243 ) 244 return self._cloud_destination_object 245 246 @property 247 def stream_names(self) -> list[str]: 248 """The stream names.""" 249 if not self._connection_info: 250 self._connection_info = self._fetch_connection_info() 251 252 return [stream.name for stream in self._connection_info.configurations.streams or []] 253 254 @property 255 def table_prefix(self) -> str: 256 """The table prefix.""" 257 if not self._connection_info: 258 self._connection_info = self._fetch_connection_info() 259 260 return self._connection_info.prefix or "" 261 262 @property 263 def connection_url(self) -> str | None: 264 """The web URL to the connection.""" 265 return f"{self.workspace.workspace_url}/connections/{self.connection_id}" 266 267 @property 268 def job_history_url(self) -> str | None: 269 """The URL to the job history for the connection.""" 270 return f"{self.connection_url}/timeline" 271 272 # Run Sync 273 274 def run_sync( 275 self, 276 *, 277 wait: bool = True, 278 wait_timeout: int = 300, 279 ) -> SyncResult: 280 """Run a sync.""" 281 connection_response = api_util.run_connection( 282 connection_id=self.connection_id, 283 api_root=self.workspace.api_root, 284 workspace_id=self.workspace.workspace_id, 285 client_id=self.workspace.client_id, 286 client_secret=self.workspace.client_secret, 287 bearer_token=self.workspace.bearer_token, 288 ) 289 sync_result = SyncResult( 290 workspace=self.workspace, 291 connection=self, 292 job_id=connection_response.job_id, 293 ) 294 295 if wait: 296 sync_result.wait_for_completion( 297 wait_timeout=wait_timeout, 298 raise_failure=True, 299 raise_timeout=True, 300 ) 301 302 return sync_result 303 304 def __repr__(self) -> str: 305 """String representation of the connection.""" 306 return ( 307 f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, " 308 f"destination_id={self.destination_id}, connection_url={self.connection_url})" 309 ) 310 311 # Logs 312 313 def get_previous_sync_logs( 314 self, 315 *, 316 limit: int = 20, 317 offset: int | None = None, 318 from_tail: bool = True, 319 job_type: str | JobTypeEnum | None = None, 320 ) -> list[SyncResult]: 321 """Get previous sync jobs for a connection with pagination support. 322 323 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 324 rows_synced, start_time). Full log text can be fetched lazily via 325 `SyncResult.get_full_log_text()`. 326 327 Args: 328 limit: Maximum number of jobs to return. Defaults to 20. 329 offset: Number of jobs to skip from the beginning. Defaults to None (0). 330 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 331 If False, returns jobs ordered oldest-first (createdAt ASC). 332 Defaults to True. 333 job_type: Filter by job type (e.g., `sync`, `refresh`). 334 If not specified, defaults to sync and reset jobs only (API default behavior). 335 336 Returns: 337 A list of SyncResult objects representing the sync jobs. 338 """ 339 order_by = ( 340 api_util.JOB_ORDER_BY_CREATED_AT_DESC 341 if from_tail 342 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 343 ) 344 sync_logs = api_util.get_job_logs( 345 connection_id=self.connection_id, 346 api_root=self.workspace.api_root, 347 workspace_id=self.workspace.workspace_id, 348 limit=limit, 349 offset=offset, 350 order_by=order_by, 351 job_type=job_type, 352 client_id=self.workspace.client_id, 353 client_secret=self.workspace.client_secret, 354 bearer_token=self.workspace.bearer_token, 355 ) 356 return [ 357 SyncResult( 358 workspace=self.workspace, 359 connection=self, 360 job_id=sync_log.job_id, 361 _latest_job_info=CloudJobInfo.from_api_response(sync_log), 362 ) 363 for sync_log in sync_logs 364 ] 365 366 def get_sync_result( 367 self, 368 job_id: int | None = None, 369 ) -> SyncResult | None: 370 """Get the sync result for the connection. 371 372 If `job_id` is not provided, the most recent sync job will be used. 373 374 Returns `None` if job_id is omitted and no previous jobs are found. 375 """ 376 if job_id is None: 377 # Get the most recent sync job 378 results = self.get_previous_sync_logs( 379 limit=1, 380 ) 381 if results: 382 return results[0] 383 384 return None 385 386 # Get the sync job by ID (lazy loaded) 387 return SyncResult( 388 workspace=self.workspace, 389 connection=self, 390 job_id=job_id, 391 ) 392 393 # Artifacts 394 395 @deprecated("Use 'dump_raw_state()' instead.") 396 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 397 """Deprecated. Use `dump_raw_state()` instead.""" 398 state_response = api_util.get_connection_state( 399 connection_id=self.connection_id, 400 api_root=self.workspace.api_root, 401 client_id=self.workspace.client_id, 402 client_secret=self.workspace.client_secret, 403 bearer_token=self.workspace.bearer_token, 404 config_api_root=self.workspace.config_api_root, 405 ) 406 if state_response.get("stateType") == "not_set": 407 return None 408 return state_response.get("streamState", []) 409 410 @overload 411 def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ... 412 413 @overload 414 def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ... 415 416 def dump_raw_state( 417 self, 418 *, 419 normalize: bool = True, 420 ) -> dict[str, Any] | list[dict[str, Any]]: 421 """Dump the state for this connection. 422 423 By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts 424 with snake_case keys, suitable for passing to a connector's `--state` flag. 425 426 When `normalize` is `False`, returns the raw Config API dict (camelCase keys, 427 includes `stateType` and `connectionId`). This raw format can be passed 428 directly to `import_raw_state()` for backup/restore workflows. 429 430 Args: 431 normalize: If `True` (default), convert to Airbyte protocol format. 432 If `False`, return the raw Config API response. 433 434 Returns: 435 Normalized: list of protocol-format state message dicts (empty list if 436 no state). Raw: the full Config API state dict. 437 """ 438 raw = api_util.get_connection_state( 439 connection_id=self.connection_id, 440 api_root=self.workspace.api_root, 441 client_id=self.workspace.client_id, 442 client_secret=self.workspace.client_secret, 443 bearer_token=self.workspace.bearer_token, 444 config_api_root=self.workspace.config_api_root, 445 ) 446 if normalize: 447 return _normalize_state_to_protocol(raw) 448 return raw 449 450 def import_raw_state( 451 self, 452 connection_state: dict[str, Any] | list[dict[str, Any]], 453 ) -> dict[str, Any]: 454 """Import (restore) the full state for this connection. 455 456 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 457 > could result in broken connections, and/or incorrect sync behavior. 458 459 Replaces the entire connection state with the provided state blob. 460 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 461 462 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 463 The `connectionId` in the blob is always overridden with this connection's 464 ID, making state blobs portable across connections. 465 466 Accepts either format: 467 468 - **Config API format** (dict with `stateType`): passed through directly. 469 - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically 470 converted to Config API format before sending. 471 472 Args: 473 connection_state: Connection state in either Config API or Airbyte protocol format. 474 475 Returns: 476 The updated connection state as a dictionary. 477 478 Raises: 479 AirbyteConnectionSyncActiveError: If a sync is currently running on this 480 connection (HTTP 423). Wait for the sync to complete before retrying. 481 """ 482 api_state: dict[str, Any] 483 if isinstance(connection_state, list): 484 if not _is_protocol_state_format(connection_state): 485 msg = ( 486 "Expected connection_state list to contain Airbyte protocol state " 487 "message dicts (each with a top-level `type` of STREAM, GLOBAL, " 488 "or LEGACY). Got a list that does not match protocol format." 489 ) 490 raise ValueError(msg) 491 api_state = _denormalize_protocol_state_to_api( 492 protocol_messages=connection_state, 493 connection_id=self.connection_id, 494 ) 495 elif isinstance(connection_state, dict): 496 if _is_protocol_state_format(connection_state): 497 api_state = _denormalize_protocol_state_to_api( 498 protocol_messages=[connection_state], 499 connection_id=self.connection_id, 500 ) 501 else: 502 api_state = connection_state 503 else: 504 msg = f"Expected a dict or list, got {type(connection_state)}" 505 raise TypeError(msg) 506 507 return api_util.replace_connection_state( 508 connection_id=self.connection_id, 509 connection_state_dict=api_state, 510 api_root=self.workspace.api_root, 511 client_id=self.workspace.client_id, 512 client_secret=self.workspace.client_secret, 513 bearer_token=self.workspace.bearer_token, 514 config_api_root=self.workspace.config_api_root, 515 ) 516 517 def get_stream_state( 518 self, 519 stream_name: str, 520 stream_namespace: str | None = None, 521 ) -> dict[str, Any] | None: 522 """Get the state blob for a single stream within this connection. 523 524 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 525 not the full connection state envelope. 526 527 This is compatible with `stream`-type state and stream-level entries 528 within a `global`-type state. It is not compatible with `legacy` state. 529 To get or set the entire connection-level state artifact, use 530 `dump_raw_state` and `import_raw_state` instead. 531 532 Args: 533 stream_name: The name of the stream to get state for. 534 stream_namespace: The source-side stream namespace. This refers to the 535 namespace from the source (e.g., database schema), not any destination 536 namespace override set in connection advanced settings. 537 538 Returns: 539 The stream's state blob as a dictionary, or None if the stream is not found. 540 """ 541 state_data = self.dump_raw_state(normalize=False) 542 result = ConnectionStateResponse(**state_data) 543 544 streams = _get_stream_list(result) 545 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 546 547 if not matching: 548 available = [s.stream_descriptor.name for s in streams] 549 logger.warning( 550 "Stream '%s' not found in connection state for connection '%s'. " 551 "Available streams: %s", 552 stream_name, 553 self.connection_id, 554 available, 555 ) 556 return None 557 558 return matching[0].stream_state 559 560 def set_stream_state( 561 self, 562 stream_name: str, 563 state_blob_dict: dict[str, Any], 564 stream_namespace: str | None = None, 565 ) -> None: 566 """Set the state for a single stream within this connection. 567 568 Fetches the current full state, replaces only the specified stream's state, 569 then sends the full updated state back to the API. If the stream does not 570 exist in the current state, it is appended. 571 572 This is compatible with `stream`-type state and stream-level entries 573 within a `global`-type state. It is not compatible with `legacy` state. 574 To get or set the entire connection-level state artifact, use 575 `dump_raw_state` and `import_raw_state` instead. 576 577 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 578 579 Args: 580 stream_name: The name of the stream to update state for. 581 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 582 stream_namespace: The source-side stream namespace. This refers to the 583 namespace from the source (e.g., database schema), not any destination 584 namespace override set in connection advanced settings. 585 586 Raises: 587 PyAirbyteInputError: If the connection state type is not supported for 588 stream-level operations (not_set, legacy). 589 AirbyteConnectionSyncActiveError: If a sync is currently running on this 590 connection (HTTP 423). Wait for the sync to complete before retrying. 591 """ 592 state_data = self.dump_raw_state(normalize=False) 593 current = ConnectionStateResponse(**state_data) 594 595 if current.state_type == "not_set": 596 raise PyAirbyteInputError( 597 message="Cannot set stream state: connection has no existing state.", 598 context={"connection_id": self.connection_id}, 599 ) 600 601 if current.state_type == "legacy": 602 raise PyAirbyteInputError( 603 message="Cannot set stream state on a legacy-type connection state.", 604 context={"connection_id": self.connection_id}, 605 ) 606 607 new_stream_entry = { 608 "streamDescriptor": { 609 "name": stream_name, 610 **( 611 { 612 "namespace": stream_namespace, 613 } 614 if stream_namespace 615 else {} 616 ), 617 }, 618 "streamState": state_blob_dict, 619 } 620 621 raw_streams: list[dict[str, Any]] 622 if current.state_type == "stream": 623 raw_streams = state_data.get("streamState", []) 624 elif current.state_type == "global": 625 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 626 else: 627 raw_streams = [] 628 629 streams = _get_stream_list(current) 630 found = False 631 updated_streams_raw: list[dict[str, Any]] = [] 632 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 633 if _match_stream(parsed_s, stream_name, stream_namespace): 634 updated_streams_raw.append(new_stream_entry) 635 found = True 636 else: 637 updated_streams_raw.append(raw_s) 638 639 if not found: 640 updated_streams_raw.append(new_stream_entry) 641 642 full_state: dict[str, Any] = { 643 **state_data, 644 } 645 646 if current.state_type == "stream": 647 full_state["streamState"] = updated_streams_raw 648 elif current.state_type == "global": 649 original_global = state_data.get("globalState", {}) 650 full_state["globalState"] = { 651 **original_global, 652 "streamStates": updated_streams_raw, 653 } 654 655 self.import_raw_state(full_state) 656 657 @deprecated("Use 'dump_raw_catalog()' instead.") 658 def get_catalog_artifact(self) -> dict[str, Any] | None: 659 """Get the configured catalog for this connection. 660 661 Returns the full configured catalog (syncCatalog) for this connection, 662 including stream schemas, sync modes, cursor fields, and primary keys. 663 664 Uses the Config API endpoint: POST /v1/web_backend/connections/get 665 666 Returns: 667 Dictionary containing the configured catalog, or `None` if not found. 668 """ 669 return self.dump_raw_catalog() 670 671 def dump_raw_catalog( 672 self, 673 *, 674 normalize: bool = True, 675 ) -> dict[str, Any] | None: 676 """Dump the configured catalog for this connection. 677 678 By default, returns the catalog in Airbyte protocol format 679 (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing 680 to a connector's `--catalog` flag. 681 682 When `normalize` is `False`, returns the raw `syncCatalog` dict from the 683 Config API (camelCase keys, nested `config` block). This raw format can be 684 passed directly to `import_raw_catalog()` for backup/restore workflows. 685 686 Args: 687 normalize: If `True` (default), convert to Airbyte protocol format. 688 If `False`, return the raw Config API catalog. 689 690 Returns: 691 The configured catalog dict, or `None` if not found. 692 """ 693 connection_response = api_util.get_connection_catalog( 694 connection_id=self.connection_id, 695 api_root=self.workspace.api_root, 696 client_id=self.workspace.client_id, 697 client_secret=self.workspace.client_secret, 698 bearer_token=self.workspace.bearer_token, 699 config_api_root=self.workspace.config_api_root, 700 ) 701 raw = connection_response.get("syncCatalog") 702 if raw is None: 703 return None 704 if normalize: 705 return _normalize_catalog_to_protocol(raw) 706 return raw 707 708 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 709 """Replace the configured catalog for this connection. 710 711 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 712 > could result in broken connections, and/or incorrect sync behavior. 713 714 Accepts a configured catalog dict and replaces the connection's entire 715 catalog with it. All other connection settings remain unchanged. 716 717 Accepts either format: 718 719 - **Config API format** (`syncCatalog` with camelCase keys and nested `config`): 720 passed through directly. 721 - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys): 722 automatically converted to Config API format before sending. 723 724 Args: 725 catalog: The configured catalog dict in either format. 726 """ 727 if _is_protocol_catalog_format(catalog): 728 catalog = _denormalize_catalog_to_api(catalog) 729 730 api_util.replace_connection_catalog( 731 connection_id=self.connection_id, 732 configured_catalog_dict=catalog, 733 api_root=self.workspace.api_root, 734 client_id=self.workspace.client_id, 735 client_secret=self.workspace.client_secret, 736 bearer_token=self.workspace.bearer_token, 737 config_api_root=self.workspace.config_api_root, 738 ) 739 740 def rename(self, name: str) -> CloudConnection: 741 """Rename the connection. 742 743 Args: 744 name: New name for the connection 745 746 Returns: 747 Updated CloudConnection object with refreshed info 748 """ 749 updated_response = api_util.patch_connection( 750 connection_id=self.connection_id, 751 api_root=self.workspace.api_root, 752 client_id=self.workspace.client_id, 753 client_secret=self.workspace.client_secret, 754 bearer_token=self.workspace.bearer_token, 755 name=name, 756 ) 757 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 758 return self 759 760 def set_table_prefix(self, prefix: str) -> CloudConnection: 761 """Set the table prefix for the connection. 762 763 Args: 764 prefix: New table prefix to use when syncing to the destination 765 766 Returns: 767 Updated CloudConnection object with refreshed info 768 """ 769 updated_response = api_util.patch_connection( 770 connection_id=self.connection_id, 771 api_root=self.workspace.api_root, 772 client_id=self.workspace.client_id, 773 client_secret=self.workspace.client_secret, 774 bearer_token=self.workspace.bearer_token, 775 prefix=prefix, 776 ) 777 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 778 return self 779 780 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 781 """Set the selected streams for the connection. 782 783 This is a destructive operation that can break existing connections if the 784 stream selection is changed incorrectly. Use with caution. 785 786 Args: 787 stream_names: List of stream names to sync 788 789 Returns: 790 Updated CloudConnection object with refreshed info 791 """ 792 configurations = api_util.build_stream_configurations(stream_names) 793 794 updated_response = api_util.patch_connection( 795 connection_id=self.connection_id, 796 api_root=self.workspace.api_root, 797 client_id=self.workspace.client_id, 798 client_secret=self.workspace.client_secret, 799 bearer_token=self.workspace.bearer_token, 800 configurations=configurations, 801 ) 802 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 803 return self 804 805 # Enable/Disable 806 807 @property 808 def enabled(self) -> bool: 809 """Get the current enabled status of the connection. 810 811 This property always fetches fresh data from the API to ensure accuracy, 812 as another process or user may have toggled the setting. 813 814 Returns: 815 True if the connection status is 'active', False otherwise. 816 """ 817 connection_info = self._fetch_connection_info(force_refresh=True) 818 return connection_info.status == "active" 819 820 @enabled.setter 821 def enabled(self, value: bool) -> None: 822 """Set the enabled status of the connection. 823 824 Args: 825 value: True to enable (set status to 'active'), False to disable 826 (set status to 'inactive'). 827 """ 828 self.set_enabled(enabled=value) 829 830 def set_enabled( 831 self, 832 *, 833 enabled: bool, 834 ignore_noop: bool = True, 835 ) -> None: 836 """Set the enabled status of the connection. 837 838 Args: 839 enabled: True to enable (set status to 'active'), False to disable 840 (set status to 'inactive'). 841 ignore_noop: If True (default), silently return if the connection is already 842 in the requested state. If False, raise ValueError when the requested 843 state matches the current state. 844 845 Raises: 846 ValueError: If ignore_noop is False and the connection is already in the 847 requested state. 848 """ 849 # Always fetch fresh data to check current status 850 connection_info = self._fetch_connection_info(force_refresh=True) 851 current_status = connection_info.status 852 desired_status = "active" if enabled else "inactive" 853 854 if current_status == desired_status: 855 if ignore_noop: 856 return 857 raise ValueError( 858 f"Connection is already {'enabled' if enabled else 'disabled'}. " 859 f"Current status: {current_status}" 860 ) 861 862 updated_response = api_util.patch_connection( 863 connection_id=self.connection_id, 864 api_root=self.workspace.api_root, 865 client_id=self.workspace.client_id, 866 client_secret=self.workspace.client_secret, 867 bearer_token=self.workspace.bearer_token, 868 status=desired_status, 869 ) 870 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 871 872 # Scheduling 873 874 def set_schedule( 875 self, 876 cron_expression: str, 877 ) -> None: 878 """Set a cron schedule for the connection. 879 880 Args: 881 cron_expression: A cron expression defining when syncs should run. 882 883 Examples: 884 - "0 0 * * *" - Daily at midnight UTC 885 - "0 */6 * * *" - Every 6 hours 886 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 887 """ 888 updated_response = api_util.patch_connection( 889 connection_id=self.connection_id, 890 api_root=self.workspace.api_root, 891 client_id=self.workspace.client_id, 892 client_secret=self.workspace.client_secret, 893 bearer_token=self.workspace.bearer_token, 894 schedule=api_util.build_connection_schedule( 895 schedule_type="cron", 896 cron_expression=cron_expression, 897 ), 898 ) 899 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 900 901 def set_manual_schedule(self) -> None: 902 """Set the connection to manual scheduling. 903 904 Disables automatic syncs. Syncs will only run when manually triggered. 905 """ 906 updated_response = api_util.patch_connection( 907 connection_id=self.connection_id, 908 api_root=self.workspace.api_root, 909 client_id=self.workspace.client_id, 910 client_secret=self.workspace.client_secret, 911 bearer_token=self.workspace.bearer_token, 912 schedule=api_util.build_connection_schedule(schedule_type="manual"), 913 ) 914 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 915 916 # Deletions 917 918 def permanently_delete( 919 self, 920 *, 921 cascade_delete_source: bool = False, 922 cascade_delete_destination: bool = False, 923 ) -> None: 924 """Delete the connection. 925 926 Args: 927 cascade_delete_source: Whether to also delete the source. 928 cascade_delete_destination: Whether to also delete the destination. 929 """ 930 self.workspace.permanently_delete_connection(self) 931 932 if cascade_delete_source: 933 self.workspace.permanently_delete_source(self.source_id) 934 935 if cascade_delete_destination: 936 self.workspace.permanently_delete_destination(self.destination_id)
A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
50 def __init__( 51 self, 52 workspace: CloudWorkspace, 53 connection_id: str, 54 source: str | None = None, 55 destination: str | None = None, 56 ) -> None: 57 """It is not recommended to create a `CloudConnection` object directly. 58 59 Instead, use `CloudWorkspace.get_connection()` to create a connection object. 60 """ 61 self.connection_id = connection_id 62 """The ID of the connection.""" 63 64 self.workspace = workspace 65 """The workspace that the connection belongs to.""" 66 67 self._source_id = source 68 """The ID of the source.""" 69 70 self._destination_id = destination 71 """The ID of the destination.""" 72 73 self._connection_info: CloudConnectionInfo | None = None 74 """The connection info object. (Cached.)""" 75 76 self._cloud_source_object: CloudSource | None = None 77 """The source object. (Cached.)""" 78 79 self._cloud_destination_object: CloudDestination | None = None 80 """The destination object. (Cached.)"""
It is not recommended to create a CloudConnection object directly.
Instead, use CloudWorkspace.get_connection() to create a connection object.
153 def check_is_valid(self) -> bool: 154 """Check if this connection exists and belongs to the expected workspace. 155 156 This method fetches connection info from the API (if not already cached) and 157 verifies that the connection's workspace_id matches the workspace associated 158 with this CloudConnection object. 159 160 Returns: 161 True if the connection exists and belongs to the expected workspace. 162 163 Raises: 164 AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace. 165 AirbyteMissingResourceError: If the connection doesn't exist. 166 """ 167 self._fetch_connection_info(force_refresh=False, verify=True) 168 return True
Check if this connection exists and belongs to the expected workspace.
This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.
Returns:
True if the connection exists and belongs to the expected workspace.
Raises:
- AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
- AirbyteMissingResourceError: If the connection doesn't exist.
189 @property 190 def name(self) -> str | None: 191 """Get the display name of the connection, if available. 192 193 E.g. "My Postgres to Snowflake", not the connection ID. 194 """ 195 if not self._connection_info: 196 self._connection_info = self._fetch_connection_info() 197 198 return self._connection_info.name
Get the display name of the connection, if available.
E.g. "My Postgres to Snowflake", not the connection ID.
200 @property 201 def source_id(self) -> str: 202 """The ID of the source.""" 203 if not self._source_id: 204 if not self._connection_info: 205 self._connection_info = self._fetch_connection_info() 206 207 self._source_id = self._connection_info.source_id 208 209 return self._source_id
The ID of the source.
211 @property 212 def source(self) -> CloudSource: 213 """Get the source object.""" 214 if self._cloud_source_object: 215 return self._cloud_source_object 216 217 self._cloud_source_object = CloudSource( 218 workspace=self.workspace, 219 connector_id=self.source_id, 220 ) 221 return self._cloud_source_object
Get the source object.
223 @property 224 def destination_id(self) -> str: 225 """The ID of the destination.""" 226 if not self._destination_id: 227 if not self._connection_info: 228 self._connection_info = self._fetch_connection_info() 229 230 self._destination_id = self._connection_info.destination_id 231 232 return self._destination_id
The ID of the destination.
234 @property 235 def destination(self) -> CloudDestination: 236 """Get the destination object.""" 237 if self._cloud_destination_object: 238 return self._cloud_destination_object 239 240 self._cloud_destination_object = CloudDestination( 241 workspace=self.workspace, 242 connector_id=self.destination_id, 243 ) 244 return self._cloud_destination_object
Get the destination object.
246 @property 247 def stream_names(self) -> list[str]: 248 """The stream names.""" 249 if not self._connection_info: 250 self._connection_info = self._fetch_connection_info() 251 252 return [stream.name for stream in self._connection_info.configurations.streams or []]
The stream names.
254 @property 255 def table_prefix(self) -> str: 256 """The table prefix.""" 257 if not self._connection_info: 258 self._connection_info = self._fetch_connection_info() 259 260 return self._connection_info.prefix or ""
The table prefix.
262 @property 263 def connection_url(self) -> str | None: 264 """The web URL to the connection.""" 265 return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
The web URL to the connection.
267 @property 268 def job_history_url(self) -> str | None: 269 """The URL to the job history for the connection.""" 270 return f"{self.connection_url}/timeline"
The URL to the job history for the connection.
274 def run_sync( 275 self, 276 *, 277 wait: bool = True, 278 wait_timeout: int = 300, 279 ) -> SyncResult: 280 """Run a sync.""" 281 connection_response = api_util.run_connection( 282 connection_id=self.connection_id, 283 api_root=self.workspace.api_root, 284 workspace_id=self.workspace.workspace_id, 285 client_id=self.workspace.client_id, 286 client_secret=self.workspace.client_secret, 287 bearer_token=self.workspace.bearer_token, 288 ) 289 sync_result = SyncResult( 290 workspace=self.workspace, 291 connection=self, 292 job_id=connection_response.job_id, 293 ) 294 295 if wait: 296 sync_result.wait_for_completion( 297 wait_timeout=wait_timeout, 298 raise_failure=True, 299 raise_timeout=True, 300 ) 301 302 return sync_result
Run a sync.
313 def get_previous_sync_logs( 314 self, 315 *, 316 limit: int = 20, 317 offset: int | None = None, 318 from_tail: bool = True, 319 job_type: str | JobTypeEnum | None = None, 320 ) -> list[SyncResult]: 321 """Get previous sync jobs for a connection with pagination support. 322 323 Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, 324 rows_synced, start_time). Full log text can be fetched lazily via 325 `SyncResult.get_full_log_text()`. 326 327 Args: 328 limit: Maximum number of jobs to return. Defaults to 20. 329 offset: Number of jobs to skip from the beginning. Defaults to None (0). 330 from_tail: If True, returns jobs ordered newest-first (createdAt DESC). 331 If False, returns jobs ordered oldest-first (createdAt ASC). 332 Defaults to True. 333 job_type: Filter by job type (e.g., `sync`, `refresh`). 334 If not specified, defaults to sync and reset jobs only (API default behavior). 335 336 Returns: 337 A list of SyncResult objects representing the sync jobs. 338 """ 339 order_by = ( 340 api_util.JOB_ORDER_BY_CREATED_AT_DESC 341 if from_tail 342 else api_util.JOB_ORDER_BY_CREATED_AT_ASC 343 ) 344 sync_logs = api_util.get_job_logs( 345 connection_id=self.connection_id, 346 api_root=self.workspace.api_root, 347 workspace_id=self.workspace.workspace_id, 348 limit=limit, 349 offset=offset, 350 order_by=order_by, 351 job_type=job_type, 352 client_id=self.workspace.client_id, 353 client_secret=self.workspace.client_secret, 354 bearer_token=self.workspace.bearer_token, 355 ) 356 return [ 357 SyncResult( 358 workspace=self.workspace, 359 connection=self, 360 job_id=sync_log.job_id, 361 _latest_job_info=CloudJobInfo.from_api_response(sync_log), 362 ) 363 for sync_log in sync_logs 364 ]
Get previous sync jobs for a connection with pagination support.
Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
rows_synced, start_time). Full log text can be fetched lazily via
SyncResult.get_full_log_text().
Arguments:
- limit: Maximum number of jobs to return. Defaults to 20.
- offset: Number of jobs to skip from the beginning. Defaults to None (0).
- from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
- job_type: Filter by job type (e.g.,
sync,refresh). If not specified, defaults to sync and reset jobs only (API default behavior).
Returns:
A list of SyncResult objects representing the sync jobs.
366 def get_sync_result( 367 self, 368 job_id: int | None = None, 369 ) -> SyncResult | None: 370 """Get the sync result for the connection. 371 372 If `job_id` is not provided, the most recent sync job will be used. 373 374 Returns `None` if job_id is omitted and no previous jobs are found. 375 """ 376 if job_id is None: 377 # Get the most recent sync job 378 results = self.get_previous_sync_logs( 379 limit=1, 380 ) 381 if results: 382 return results[0] 383 384 return None 385 386 # Get the sync job by ID (lazy loaded) 387 return SyncResult( 388 workspace=self.workspace, 389 connection=self, 390 job_id=job_id, 391 )
Get the sync result for the connection.
If job_id is not provided, the most recent sync job will be used.
Returns None if job_id is omitted and no previous jobs are found.
395 @deprecated("Use 'dump_raw_state()' instead.") 396 def get_state_artifacts(self) -> list[dict[str, Any]] | None: 397 """Deprecated. Use `dump_raw_state()` instead.""" 398 state_response = api_util.get_connection_state( 399 connection_id=self.connection_id, 400 api_root=self.workspace.api_root, 401 client_id=self.workspace.client_id, 402 client_secret=self.workspace.client_secret, 403 bearer_token=self.workspace.bearer_token, 404 config_api_root=self.workspace.config_api_root, 405 ) 406 if state_response.get("stateType") == "not_set": 407 return None 408 return state_response.get("streamState", [])
Deprecated. Use dump_raw_state() instead.
416 def dump_raw_state( 417 self, 418 *, 419 normalize: bool = True, 420 ) -> dict[str, Any] | list[dict[str, Any]]: 421 """Dump the state for this connection. 422 423 By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts 424 with snake_case keys, suitable for passing to a connector's `--state` flag. 425 426 When `normalize` is `False`, returns the raw Config API dict (camelCase keys, 427 includes `stateType` and `connectionId`). This raw format can be passed 428 directly to `import_raw_state()` for backup/restore workflows. 429 430 Args: 431 normalize: If `True` (default), convert to Airbyte protocol format. 432 If `False`, return the raw Config API response. 433 434 Returns: 435 Normalized: list of protocol-format state message dicts (empty list if 436 no state). Raw: the full Config API state dict. 437 """ 438 raw = api_util.get_connection_state( 439 connection_id=self.connection_id, 440 api_root=self.workspace.api_root, 441 client_id=self.workspace.client_id, 442 client_secret=self.workspace.client_secret, 443 bearer_token=self.workspace.bearer_token, 444 config_api_root=self.workspace.config_api_root, 445 ) 446 if normalize: 447 return _normalize_state_to_protocol(raw) 448 return raw
Dump the state for this connection.
By default, returns a list of Airbyte protocol AirbyteStateMessage dicts
with snake_case keys, suitable for passing to a connector's --state flag.
When normalize is False, returns the raw Config API dict (camelCase keys,
includes stateType and connectionId). This raw format can be passed
directly to import_raw_state() for backup/restore workflows.
Arguments:
- normalize: If
True(default), convert to Airbyte protocol format. IfFalse, return the raw Config API response.
Returns:
Normalized: list of protocol-format state message dicts (empty list if no state). Raw: the full Config API state dict.
450 def import_raw_state( 451 self, 452 connection_state: dict[str, Any] | list[dict[str, Any]], 453 ) -> dict[str, Any]: 454 """Import (restore) the full state for this connection. 455 456 > ⚠️ **WARNING:** Modifying the state directly is not recommended and 457 > could result in broken connections, and/or incorrect sync behavior. 458 459 Replaces the entire connection state with the provided state blob. 460 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 461 462 This is the counterpart to `dump_raw_state()` for backup/restore workflows. 463 The `connectionId` in the blob is always overridden with this connection's 464 ID, making state blobs portable across connections. 465 466 Accepts either format: 467 468 - **Config API format** (dict with `stateType`): passed through directly. 469 - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically 470 converted to Config API format before sending. 471 472 Args: 473 connection_state: Connection state in either Config API or Airbyte protocol format. 474 475 Returns: 476 The updated connection state as a dictionary. 477 478 Raises: 479 AirbyteConnectionSyncActiveError: If a sync is currently running on this 480 connection (HTTP 423). Wait for the sync to complete before retrying. 481 """ 482 api_state: dict[str, Any] 483 if isinstance(connection_state, list): 484 if not _is_protocol_state_format(connection_state): 485 msg = ( 486 "Expected connection_state list to contain Airbyte protocol state " 487 "message dicts (each with a top-level `type` of STREAM, GLOBAL, " 488 "or LEGACY). Got a list that does not match protocol format." 489 ) 490 raise ValueError(msg) 491 api_state = _denormalize_protocol_state_to_api( 492 protocol_messages=connection_state, 493 connection_id=self.connection_id, 494 ) 495 elif isinstance(connection_state, dict): 496 if _is_protocol_state_format(connection_state): 497 api_state = _denormalize_protocol_state_to_api( 498 protocol_messages=[connection_state], 499 connection_id=self.connection_id, 500 ) 501 else: 502 api_state = connection_state 503 else: 504 msg = f"Expected a dict or list, got {type(connection_state)}" 505 raise TypeError(msg) 506 507 return api_util.replace_connection_state( 508 connection_id=self.connection_id, 509 connection_state_dict=api_state, 510 api_root=self.workspace.api_root, 511 client_id=self.workspace.client_id, 512 client_secret=self.workspace.client_secret, 513 bearer_token=self.workspace.bearer_token, 514 config_api_root=self.workspace.config_api_root, 515 )
Import (restore) the full state for this connection.
⚠️ WARNING: Modifying the state directly is not recommended and could result in broken connections, and/or incorrect sync behavior.
Replaces the entire connection state with the provided state blob. Uses the safe variant that prevents updates while a sync is running (HTTP 423).
This is the counterpart to dump_raw_state() for backup/restore workflows.
The connectionId in the blob is always overridden with this connection's
ID, making state blobs portable across connections.
Accepts either format:
- Config API format (dict with
stateType): passed through directly. - Airbyte protocol format (list of
AirbyteStateMessagedicts): automatically converted to Config API format before sending.
Arguments:
- connection_state: Connection state in either Config API or Airbyte protocol format.
Returns:
The updated connection state as a dictionary.
Raises:
- AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
517 def get_stream_state( 518 self, 519 stream_name: str, 520 stream_namespace: str | None = None, 521 ) -> dict[str, Any] | None: 522 """Get the state blob for a single stream within this connection. 523 524 Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), 525 not the full connection state envelope. 526 527 This is compatible with `stream`-type state and stream-level entries 528 within a `global`-type state. It is not compatible with `legacy` state. 529 To get or set the entire connection-level state artifact, use 530 `dump_raw_state` and `import_raw_state` instead. 531 532 Args: 533 stream_name: The name of the stream to get state for. 534 stream_namespace: The source-side stream namespace. This refers to the 535 namespace from the source (e.g., database schema), not any destination 536 namespace override set in connection advanced settings. 537 538 Returns: 539 The stream's state blob as a dictionary, or None if the stream is not found. 540 """ 541 state_data = self.dump_raw_state(normalize=False) 542 result = ConnectionStateResponse(**state_data) 543 544 streams = _get_stream_list(result) 545 matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)] 546 547 if not matching: 548 available = [s.stream_descriptor.name for s in streams] 549 logger.warning( 550 "Stream '%s' not found in connection state for connection '%s'. " 551 "Available streams: %s", 552 stream_name, 553 self.connection_id, 554 available, 555 ) 556 return None 557 558 return matching[0].stream_state
Get the state blob for a single stream within this connection.
Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), not the full connection state envelope.
This is compatible with stream-type state and stream-level entries
within a global-type state. It is not compatible with legacy state.
To get or set the entire connection-level state artifact, use
dump_raw_state and import_raw_state instead.
Arguments:
- stream_name: The name of the stream to get state for.
- stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Returns:
The stream's state blob as a dictionary, or None if the stream is not found.
560 def set_stream_state( 561 self, 562 stream_name: str, 563 state_blob_dict: dict[str, Any], 564 stream_namespace: str | None = None, 565 ) -> None: 566 """Set the state for a single stream within this connection. 567 568 Fetches the current full state, replaces only the specified stream's state, 569 then sends the full updated state back to the API. If the stream does not 570 exist in the current state, it is appended. 571 572 This is compatible with `stream`-type state and stream-level entries 573 within a `global`-type state. It is not compatible with `legacy` state. 574 To get or set the entire connection-level state artifact, use 575 `dump_raw_state` and `import_raw_state` instead. 576 577 Uses the safe variant that prevents updates while a sync is running (HTTP 423). 578 579 Args: 580 stream_name: The name of the stream to update state for. 581 state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}). 582 stream_namespace: The source-side stream namespace. This refers to the 583 namespace from the source (e.g., database schema), not any destination 584 namespace override set in connection advanced settings. 585 586 Raises: 587 PyAirbyteInputError: If the connection state type is not supported for 588 stream-level operations (not_set, legacy). 589 AirbyteConnectionSyncActiveError: If a sync is currently running on this 590 connection (HTTP 423). Wait for the sync to complete before retrying. 591 """ 592 state_data = self.dump_raw_state(normalize=False) 593 current = ConnectionStateResponse(**state_data) 594 595 if current.state_type == "not_set": 596 raise PyAirbyteInputError( 597 message="Cannot set stream state: connection has no existing state.", 598 context={"connection_id": self.connection_id}, 599 ) 600 601 if current.state_type == "legacy": 602 raise PyAirbyteInputError( 603 message="Cannot set stream state on a legacy-type connection state.", 604 context={"connection_id": self.connection_id}, 605 ) 606 607 new_stream_entry = { 608 "streamDescriptor": { 609 "name": stream_name, 610 **( 611 { 612 "namespace": stream_namespace, 613 } 614 if stream_namespace 615 else {} 616 ), 617 }, 618 "streamState": state_blob_dict, 619 } 620 621 raw_streams: list[dict[str, Any]] 622 if current.state_type == "stream": 623 raw_streams = state_data.get("streamState", []) 624 elif current.state_type == "global": 625 raw_streams = state_data.get("globalState", {}).get("streamStates", []) 626 else: 627 raw_streams = [] 628 629 streams = _get_stream_list(current) 630 found = False 631 updated_streams_raw: list[dict[str, Any]] = [] 632 for raw_s, parsed_s in zip(raw_streams, streams, strict=False): 633 if _match_stream(parsed_s, stream_name, stream_namespace): 634 updated_streams_raw.append(new_stream_entry) 635 found = True 636 else: 637 updated_streams_raw.append(raw_s) 638 639 if not found: 640 updated_streams_raw.append(new_stream_entry) 641 642 full_state: dict[str, Any] = { 643 **state_data, 644 } 645 646 if current.state_type == "stream": 647 full_state["streamState"] = updated_streams_raw 648 elif current.state_type == "global": 649 original_global = state_data.get("globalState", {}) 650 full_state["globalState"] = { 651 **original_global, 652 "streamStates": updated_streams_raw, 653 } 654 655 self.import_raw_state(full_state)
Set the state for a single stream within this connection.
Fetches the current full state, replaces only the specified stream's state, then sends the full updated state back to the API. If the stream does not exist in the current state, it is appended.
This is compatible with stream-type state and stream-level entries
within a global-type state. It is not compatible with legacy state.
To get or set the entire connection-level state artifact, use
dump_raw_state and import_raw_state instead.
Uses the safe variant that prevents updates while a sync is running (HTTP 423).
Arguments:
- stream_name: The name of the stream to update state for.
- state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
- stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Raises:
- PyAirbyteInputError: If the connection state type is not supported for stream-level operations (not_set, legacy).
- AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
657 @deprecated("Use 'dump_raw_catalog()' instead.") 658 def get_catalog_artifact(self) -> dict[str, Any] | None: 659 """Get the configured catalog for this connection. 660 661 Returns the full configured catalog (syncCatalog) for this connection, 662 including stream schemas, sync modes, cursor fields, and primary keys. 663 664 Uses the Config API endpoint: POST /v1/web_backend/connections/get 665 666 Returns: 667 Dictionary containing the configured catalog, or `None` if not found. 668 """ 669 return self.dump_raw_catalog()
Get the configured catalog for this connection.
Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.
Uses the Config API endpoint: POST /v1/web_backend/connections/get
Returns:
Dictionary containing the configured catalog, or
Noneif not found.
671 def dump_raw_catalog( 672 self, 673 *, 674 normalize: bool = True, 675 ) -> dict[str, Any] | None: 676 """Dump the configured catalog for this connection. 677 678 By default, returns the catalog in Airbyte protocol format 679 (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing 680 to a connector's `--catalog` flag. 681 682 When `normalize` is `False`, returns the raw `syncCatalog` dict from the 683 Config API (camelCase keys, nested `config` block). This raw format can be 684 passed directly to `import_raw_catalog()` for backup/restore workflows. 685 686 Args: 687 normalize: If `True` (default), convert to Airbyte protocol format. 688 If `False`, return the raw Config API catalog. 689 690 Returns: 691 The configured catalog dict, or `None` if not found. 692 """ 693 connection_response = api_util.get_connection_catalog( 694 connection_id=self.connection_id, 695 api_root=self.workspace.api_root, 696 client_id=self.workspace.client_id, 697 client_secret=self.workspace.client_secret, 698 bearer_token=self.workspace.bearer_token, 699 config_api_root=self.workspace.config_api_root, 700 ) 701 raw = connection_response.get("syncCatalog") 702 if raw is None: 703 return None 704 if normalize: 705 return _normalize_catalog_to_protocol(raw) 706 return raw
Dump the configured catalog for this connection.
By default, returns the catalog in Airbyte protocol format
(ConfiguredAirbyteCatalog with snake_case keys), suitable for passing
to a connector's --catalog flag.
When normalize is False, returns the raw syncCatalog dict from the
Config API (camelCase keys, nested config block). This raw format can be
passed directly to import_raw_catalog() for backup/restore workflows.
Arguments:
- normalize: If
True(default), convert to Airbyte protocol format. IfFalse, return the raw Config API catalog.
Returns:
The configured catalog dict, or
Noneif not found.
708 def import_raw_catalog(self, catalog: dict[str, Any]) -> None: 709 """Replace the configured catalog for this connection. 710 711 > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and 712 > could result in broken connections, and/or incorrect sync behavior. 713 714 Accepts a configured catalog dict and replaces the connection's entire 715 catalog with it. All other connection settings remain unchanged. 716 717 Accepts either format: 718 719 - **Config API format** (`syncCatalog` with camelCase keys and nested `config`): 720 passed through directly. 721 - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys): 722 automatically converted to Config API format before sending. 723 724 Args: 725 catalog: The configured catalog dict in either format. 726 """ 727 if _is_protocol_catalog_format(catalog): 728 catalog = _denormalize_catalog_to_api(catalog) 729 730 api_util.replace_connection_catalog( 731 connection_id=self.connection_id, 732 configured_catalog_dict=catalog, 733 api_root=self.workspace.api_root, 734 client_id=self.workspace.client_id, 735 client_secret=self.workspace.client_secret, 736 bearer_token=self.workspace.bearer_token, 737 config_api_root=self.workspace.config_api_root, 738 )
Replace the configured catalog for this connection.
⚠️ WARNING: Modifying the catalog directly is not recommended and could result in broken connections, and/or incorrect sync behavior.
Accepts a configured catalog dict and replaces the connection's entire catalog with it. All other connection settings remain unchanged.
Accepts either format:
- Config API format (
syncCatalogwith camelCase keys and nestedconfig): passed through directly. - Airbyte protocol format (
ConfiguredAirbyteCatalogwith snake_case keys): automatically converted to Config API format before sending.
Arguments:
- catalog: The configured catalog dict in either format.
740 def rename(self, name: str) -> CloudConnection: 741 """Rename the connection. 742 743 Args: 744 name: New name for the connection 745 746 Returns: 747 Updated CloudConnection object with refreshed info 748 """ 749 updated_response = api_util.patch_connection( 750 connection_id=self.connection_id, 751 api_root=self.workspace.api_root, 752 client_id=self.workspace.client_id, 753 client_secret=self.workspace.client_secret, 754 bearer_token=self.workspace.bearer_token, 755 name=name, 756 ) 757 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 758 return self
Rename the connection.
Arguments:
- name: New name for the connection
Returns:
Updated CloudConnection object with refreshed info
760 def set_table_prefix(self, prefix: str) -> CloudConnection: 761 """Set the table prefix for the connection. 762 763 Args: 764 prefix: New table prefix to use when syncing to the destination 765 766 Returns: 767 Updated CloudConnection object with refreshed info 768 """ 769 updated_response = api_util.patch_connection( 770 connection_id=self.connection_id, 771 api_root=self.workspace.api_root, 772 client_id=self.workspace.client_id, 773 client_secret=self.workspace.client_secret, 774 bearer_token=self.workspace.bearer_token, 775 prefix=prefix, 776 ) 777 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 778 return self
Set the table prefix for the connection.
Arguments:
- prefix: New table prefix to use when syncing to the destination
Returns:
Updated CloudConnection object with refreshed info
780 def set_selected_streams(self, stream_names: list[str]) -> CloudConnection: 781 """Set the selected streams for the connection. 782 783 This is a destructive operation that can break existing connections if the 784 stream selection is changed incorrectly. Use with caution. 785 786 Args: 787 stream_names: List of stream names to sync 788 789 Returns: 790 Updated CloudConnection object with refreshed info 791 """ 792 configurations = api_util.build_stream_configurations(stream_names) 793 794 updated_response = api_util.patch_connection( 795 connection_id=self.connection_id, 796 api_root=self.workspace.api_root, 797 client_id=self.workspace.client_id, 798 client_secret=self.workspace.client_secret, 799 bearer_token=self.workspace.bearer_token, 800 configurations=configurations, 801 ) 802 self._connection_info = CloudConnectionInfo.from_api_response(updated_response) 803 return self
Set the selected streams for the connection.
This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.
Arguments:
- stream_names: List of stream names to sync
Returns:
Updated CloudConnection object with refreshed info
807 @property 808 def enabled(self) -> bool: 809 """Get the current enabled status of the connection. 810 811 This property always fetches fresh data from the API to ensure accuracy, 812 as another process or user may have toggled the setting. 813 814 Returns: 815 True if the connection status is 'active', False otherwise. 816 """ 817 connection_info = self._fetch_connection_info(force_refresh=True) 818 return connection_info.status == "active"
Get the current enabled status of the connection.
This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.
Returns:
True if the connection status is 'active', False otherwise.
830 def set_enabled( 831 self, 832 *, 833 enabled: bool, 834 ignore_noop: bool = True, 835 ) -> None: 836 """Set the enabled status of the connection. 837 838 Args: 839 enabled: True to enable (set status to 'active'), False to disable 840 (set status to 'inactive'). 841 ignore_noop: If True (default), silently return if the connection is already 842 in the requested state. If False, raise ValueError when the requested 843 state matches the current state. 844 845 Raises: 846 ValueError: If ignore_noop is False and the connection is already in the 847 requested state. 848 """ 849 # Always fetch fresh data to check current status 850 connection_info = self._fetch_connection_info(force_refresh=True) 851 current_status = connection_info.status 852 desired_status = "active" if enabled else "inactive" 853 854 if current_status == desired_status: 855 if ignore_noop: 856 return 857 raise ValueError( 858 f"Connection is already {'enabled' if enabled else 'disabled'}. " 859 f"Current status: {current_status}" 860 ) 861 862 updated_response = api_util.patch_connection( 863 connection_id=self.connection_id, 864 api_root=self.workspace.api_root, 865 client_id=self.workspace.client_id, 866 client_secret=self.workspace.client_secret, 867 bearer_token=self.workspace.bearer_token, 868 status=desired_status, 869 ) 870 self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
Set the enabled status of the connection.
Arguments:
- enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
- ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
- ValueError: If ignore_noop is False and the connection is already in the requested state.
874 def set_schedule( 875 self, 876 cron_expression: str, 877 ) -> None: 878 """Set a cron schedule for the connection. 879 880 Args: 881 cron_expression: A cron expression defining when syncs should run. 882 883 Examples: 884 - "0 0 * * *" - Daily at midnight UTC 885 - "0 */6 * * *" - Every 6 hours 886 - "0 0 * * 0" - Weekly on Sunday at midnight UTC 887 """ 888 updated_response = api_util.patch_connection( 889 connection_id=self.connection_id, 890 api_root=self.workspace.api_root, 891 client_id=self.workspace.client_id, 892 client_secret=self.workspace.client_secret, 893 bearer_token=self.workspace.bearer_token, 894 schedule=api_util.build_connection_schedule( 895 schedule_type="cron", 896 cron_expression=cron_expression, 897 ), 898 ) 899 self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
Set a cron schedule for the connection.
Arguments:
- cron_expression: A cron expression defining when syncs should run.
Examples:
- "0 0 * * *" - Daily at midnight UTC
- "0 */6 * * *" - Every 6 hours
- "0 0 * * 0" - Weekly on Sunday at midnight UTC
901 def set_manual_schedule(self) -> None: 902 """Set the connection to manual scheduling. 903 904 Disables automatic syncs. Syncs will only run when manually triggered. 905 """ 906 updated_response = api_util.patch_connection( 907 connection_id=self.connection_id, 908 api_root=self.workspace.api_root, 909 client_id=self.workspace.client_id, 910 client_secret=self.workspace.client_secret, 911 bearer_token=self.workspace.bearer_token, 912 schedule=api_util.build_connection_schedule(schedule_type="manual"), 913 ) 914 self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
Set the connection to manual scheduling.
Disables automatic syncs. Syncs will only run when manually triggered.
918 def permanently_delete( 919 self, 920 *, 921 cascade_delete_source: bool = False, 922 cascade_delete_destination: bool = False, 923 ) -> None: 924 """Delete the connection. 925 926 Args: 927 cascade_delete_source: Whether to also delete the source. 928 cascade_delete_destination: Whether to also delete the destination. 929 """ 930 self.workspace.permanently_delete_connection(self) 931 932 if cascade_delete_source: 933 self.workspace.permanently_delete_source(self.source_id) 934 935 if cascade_delete_destination: 936 self.workspace.permanently_delete_destination(self.destination_id)
Delete the connection.
Arguments:
- cascade_delete_source: Whether to also delete the source.
- cascade_delete_destination: Whether to also delete the destination.
59@dataclass 60class CloudClientConfig: 61 """Client configuration for Airbyte Cloud API. 62 63 This class encapsulates the authentication and API configuration needed to connect 64 to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually 65 exclusive authentication methods: 66 67 1. OAuth2 client credentials flow (client_id + client_secret) 68 2. Bearer token authentication 69 70 Exactly one authentication method must be provided. Providing both or neither 71 will raise a validation error. 72 73 Attributes: 74 client_id: OAuth2 client ID for client credentials flow. 75 client_secret: OAuth2 client secret for client credentials flow. 76 bearer_token: Pre-generated bearer token for direct authentication. 77 api_root: The API root URL. Defaults to Airbyte Cloud API. 78 config_api_root: The Config API root URL. 79 """ 80 81 client_id: SecretString | None = None 82 """OAuth2 client ID for client credentials authentication.""" 83 84 client_secret: SecretString | None = None 85 """OAuth2 client secret for client credentials authentication.""" 86 87 bearer_token: SecretString | None = None 88 """Bearer token for direct authentication (alternative to client credentials).""" 89 90 api_root: str = api_util.CLOUD_API_ROOT 91 """The API root URL. Defaults to Airbyte Cloud API.""" 92 93 config_api_root: str | None = None 94 """The Config API root URL.""" 95 96 def __post_init__(self) -> None: 97 """Validate credentials and ensure secrets are properly wrapped.""" 98 # Wrap secrets in SecretString if they aren't already 99 if self.client_id is not None: 100 self.client_id = SecretString(self.client_id) 101 if self.client_secret is not None: 102 self.client_secret = SecretString(self.client_secret) 103 if self.bearer_token is not None: 104 self.bearer_token = SecretString(self.bearer_token) 105 106 # Validate mutual exclusivity 107 has_client_credentials = self.client_id is not None or self.client_secret is not None 108 has_bearer_token = self.bearer_token is not None 109 110 if has_client_credentials and has_bearer_token: 111 raise PyAirbyteInputError( 112 message="Cannot use both client credentials and bearer token authentication.", 113 guidance=( 114 "Provide either client_id and client_secret together, " 115 "or bearer_token alone, but not both." 116 ), 117 ) 118 119 if has_client_credentials and (self.client_id is None or self.client_secret is None): 120 # If using client credentials, both must be provided 121 raise PyAirbyteInputError( 122 message="Incomplete client credentials.", 123 guidance=( 124 "When using client credentials authentication, " 125 "both client_id and client_secret must be provided." 126 ), 127 ) 128 129 if not has_client_credentials and not has_bearer_token: 130 raise PyAirbyteInputError( 131 message="No authentication credentials provided.", 132 guidance=( 133 "Provide either client_id and client_secret together for OAuth2 " 134 "client credentials flow, or bearer_token for direct authentication." 135 ), 136 ) 137 138 @property 139 def uses_bearer_token(self) -> bool: 140 """Return True if using bearer token authentication.""" 141 return self.bearer_token is not None 142 143 @property 144 def uses_client_credentials(self) -> bool: 145 """Return True if using client credentials authentication.""" 146 return self.client_id is not None and self.client_secret is not None 147 148 @classmethod 149 def from_env( 150 cls, 151 *, 152 api_root: str | None = None, 153 config_api_root: str | None = None, 154 ) -> CloudClientConfig: 155 """Create CloudClientConfig from environment variables. 156 157 This factory method resolves credentials from environment variables, 158 providing a convenient way to create credentials without explicitly 159 passing secrets. 160 161 Environment variables used: 162 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 163 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 164 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 165 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 166 - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL. 167 168 The method will first check for a bearer token. If not found, it will 169 attempt to use client credentials. 170 171 Args: 172 api_root: The API root URL. If not provided, will be resolved from 173 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 174 the Airbyte Cloud API. 175 config_api_root: The Config API root URL. If not provided, will be resolved 176 from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable. 177 178 Returns: 179 A CloudClientConfig instance configured with credentials from the environment. 180 181 Raises: 182 PyAirbyteSecretNotFoundError: If required credentials are not found in 183 the environment. 184 """ 185 resolved_api_root = resolve_cloud_api_url(api_root) 186 resolved_config_api_root = resolve_cloud_config_api_url(config_api_root) 187 188 # Try bearer token first 189 bearer_token = resolve_cloud_bearer_token() 190 if bearer_token: 191 return cls( 192 bearer_token=bearer_token, 193 api_root=resolved_api_root, 194 config_api_root=resolved_config_api_root, 195 ) 196 197 # Fall back to client credentials 198 return cls( 199 client_id=resolve_cloud_client_id(), 200 client_secret=resolve_cloud_client_secret(), 201 api_root=resolved_api_root, 202 config_api_root=resolved_config_api_root, 203 )
Client configuration for Airbyte Cloud API.
This class encapsulates the authentication and API configuration needed to connect to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually exclusive authentication methods:
- OAuth2 client credentials flow (client_id + client_secret)
- Bearer token authentication
Exactly one authentication method must be provided. Providing both or neither will raise a validation error.
Attributes:
- client_id: OAuth2 client ID for client credentials flow.
- client_secret: OAuth2 client secret for client credentials flow.
- bearer_token: Pre-generated bearer token for direct authentication.
- api_root: The API root URL. Defaults to Airbyte Cloud API.
- config_api_root: The Config API root URL.
138 @property 139 def uses_bearer_token(self) -> bool: 140 """Return True if using bearer token authentication.""" 141 return self.bearer_token is not None
Return True if using bearer token authentication.
143 @property 144 def uses_client_credentials(self) -> bool: 145 """Return True if using client credentials authentication.""" 146 return self.client_id is not None and self.client_secret is not None
Return True if using client credentials authentication.
148 @classmethod 149 def from_env( 150 cls, 151 *, 152 api_root: str | None = None, 153 config_api_root: str | None = None, 154 ) -> CloudClientConfig: 155 """Create CloudClientConfig from environment variables. 156 157 This factory method resolves credentials from environment variables, 158 providing a convenient way to create credentials without explicitly 159 passing secrets. 160 161 Environment variables used: 162 - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow). 163 - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow). 164 - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials). 165 - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud). 166 - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL. 167 168 The method will first check for a bearer token. If not found, it will 169 attempt to use client credentials. 170 171 Args: 172 api_root: The API root URL. If not provided, will be resolved from 173 the `AIRBYTE_CLOUD_API_URL` environment variable, or default to 174 the Airbyte Cloud API. 175 config_api_root: The Config API root URL. If not provided, will be resolved 176 from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable. 177 178 Returns: 179 A CloudClientConfig instance configured with credentials from the environment. 180 181 Raises: 182 PyAirbyteSecretNotFoundError: If required credentials are not found in 183 the environment. 184 """ 185 resolved_api_root = resolve_cloud_api_url(api_root) 186 resolved_config_api_root = resolve_cloud_config_api_url(config_api_root) 187 188 # Try bearer token first 189 bearer_token = resolve_cloud_bearer_token() 190 if bearer_token: 191 return cls( 192 bearer_token=bearer_token, 193 api_root=resolved_api_root, 194 config_api_root=resolved_config_api_root, 195 ) 196 197 # Fall back to client credentials 198 return cls( 199 client_id=resolve_cloud_client_id(), 200 client_secret=resolve_cloud_client_secret(), 201 api_root=resolved_api_root, 202 config_api_root=resolved_config_api_root, 203 )
Create CloudClientConfig from environment variables.
This factory method resolves credentials from environment variables, providing a convenient way to create credentials without explicitly passing secrets.
Environment variables used:
AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).AIRBYTE_CLOUD_CONFIG_API_URL: Optional. The Config API root URL.
The method will first check for a bearer token. If not found, it will attempt to use client credentials.
Arguments:
- api_root: The API root URL. If not provided, will be resolved from
the
AIRBYTE_CLOUD_API_URLenvironment variable, or default to the Airbyte Cloud API. - config_api_root: The Config API root URL. If not provided, will be resolved
from the
AIRBYTE_CLOUD_CONFIG_API_URLenvironment variable.
Returns:
A CloudClientConfig instance configured with credentials from the environment.
Raises:
- PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
78class CloudWorkspaceInfo(BaseModel): 79 """Information about an Airbyte workspace.""" 80 81 model_config = ConfigDict(populate_by_name=True) 82 83 workspace_id: str = Field(alias="workspaceId") 84 """The workspace ID.""" 85 86 name: str 87 """The workspace name.""" 88 89 data_residency: str | None = Field(default=None, alias="dataResidency") 90 """The data residency setting for the workspace, if available.""" 91 92 organization_id: str | None = Field(default=None, alias="organizationId") 93 """The organization ID for the workspace, if available.""" 94 95 notifications: dict[str, object | None] = Field(default_factory=dict) 96 """Workspace notification settings.""" 97 98 @classmethod 99 def from_api_response(cls, workspace: _WorkspaceResponseLike) -> CloudWorkspaceInfo: 100 """Create a public model from an internal API workspace response.""" 101 return cls( 102 workspace_id=workspace.workspace_id, 103 name=workspace.name, 104 data_residency=workspace.data_residency, 105 organization_id=getattr(workspace, "organization_id", None), 106 notifications=_notifications_to_dict(workspace.notifications), 107 ) 108 109 @classmethod 110 def from_mapping(cls, workspace: Mapping[str, object]) -> CloudWorkspaceInfo: 111 """Create a public model from a workspace mapping.""" 112 return cls.model_validate(workspace) 113 114 def to_dict(self) -> dict[str, object]: 115 """Return a JSON-serializable dictionary.""" 116 return self.model_dump(mode="json")
Information about an Airbyte workspace.
98 @classmethod 99 def from_api_response(cls, workspace: _WorkspaceResponseLike) -> CloudWorkspaceInfo: 100 """Create a public model from an internal API workspace response.""" 101 return cls( 102 workspace_id=workspace.workspace_id, 103 name=workspace.name, 104 data_residency=workspace.data_residency, 105 organization_id=getattr(workspace, "organization_id", None), 106 notifications=_notifications_to_dict(workspace.notifications), 107 )
Create a public model from an internal API workspace response.
218@dataclass 219class SyncResult: 220 """The result of a sync operation. 221 222 **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by 223 interacting with the `.CloudWorkspace` and `.CloudConnection` objects. 224 """ 225 226 workspace: CloudWorkspace 227 connection: CloudConnection 228 job_id: int 229 table_name_prefix: str = "" 230 table_name_suffix: str = "" 231 _latest_job_info: CloudJobInfo | None = None 232 _connection_response: CloudConnectionInfo | None = None 233 _cache: CacheBase | None = None 234 _job_with_attempts_info: dict[str, Any] | None = None 235 236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}" 247 248 def _get_connection_info(self, *, force_refresh: bool = False) -> CloudConnectionInfo: 249 """Return connection info for the sync job.""" 250 if self._connection_response and not force_refresh: 251 return self._connection_response 252 253 self._connection_response = CloudConnectionInfo.from_api_response( 254 api_util.get_connection( 255 workspace_id=self.workspace.workspace_id, 256 api_root=self.workspace.api_root, 257 connection_id=self.connection.connection_id, 258 client_id=self.workspace.client_id, 259 client_secret=self.workspace.client_secret, 260 bearer_token=self.workspace.bearer_token, 261 ) 262 ) 263 return self._connection_response 264 265 def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]: 266 """Return the destination configuration for the sync job.""" 267 connection_info = self._get_connection_info(force_refresh=force_refresh) 268 destination_response = api_util.get_destination( 269 destination_id=connection_info.destination_id, 270 api_root=self.workspace.api_root, 271 client_id=self.workspace.client_id, 272 client_secret=self.workspace.client_secret, 273 bearer_token=self.workspace.bearer_token, 274 ) 275 return asdict(destination_response.configuration) 276 277 def is_job_complete(self) -> bool: 278 """Check if the sync job is complete.""" 279 return self.get_job_status() in FINAL_STATUSES 280 281 def get_job_status(self) -> JobStatusEnum: 282 """Check if the sync job is still running.""" 283 return self._fetch_latest_job_info().status 284 285 def _fetch_latest_job_info(self) -> CloudJobInfo: 286 """Return the job info for the sync job.""" 287 if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES: 288 return self._latest_job_info 289 290 self._latest_job_info = CloudJobInfo.from_api_response( 291 api_util.get_job_info( 292 job_id=self.job_id, 293 api_root=self.workspace.api_root, 294 client_id=self.workspace.client_id, 295 client_secret=self.workspace.client_secret, 296 bearer_token=self.workspace.bearer_token, 297 ) 298 ) 299 return self._latest_job_info 300 301 @property 302 def bytes_synced(self) -> int: 303 """Return the number of records processed.""" 304 return self._fetch_latest_job_info().bytes_synced or 0 305 306 @property 307 def records_synced(self) -> int: 308 """Return the number of records processed.""" 309 return self._fetch_latest_job_info().rows_synced or 0 310 311 @property 312 def start_time(self) -> datetime: 313 """Return the start time of the sync job in UTC.""" 314 try: 315 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 316 except (ValueError, TypeError) as e: 317 if "Invalid isoformat string" in str(e): 318 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 319 api_root=self.workspace.api_root, 320 config_api_root=self.workspace.config_api_root, 321 path="/jobs/get", 322 json={"id": self.job_id}, 323 client_id=self.workspace.client_id, 324 client_secret=self.workspace.client_secret, 325 bearer_token=self.workspace.bearer_token, 326 ) 327 raw_start_time = job_info_raw.get("startTime") 328 if raw_start_time: 329 return ab_datetime_parse(raw_start_time) 330 raise 331 332 def _fetch_job_with_attempts(self) -> dict[str, Any]: 333 """Fetch job info with attempts from Config API using lazy loading pattern.""" 334 if self._job_with_attempts_info is not None: 335 return self._job_with_attempts_info 336 337 self._job_with_attempts_info = api_util._make_config_api_request( # noqa: SLF001 # Config API helper 338 api_root=self.workspace.api_root, 339 config_api_root=self.workspace.config_api_root, 340 path="/jobs/get", 341 json={ 342 "id": self.job_id, 343 }, 344 client_id=self.workspace.client_id, 345 client_secret=self.workspace.client_secret, 346 bearer_token=self.workspace.bearer_token, 347 ) 348 return self._job_with_attempts_info 349 350 def get_attempts(self) -> list[SyncAttempt]: 351 """Return a list of attempts for this sync job.""" 352 job_with_attempts = self._fetch_job_with_attempts() 353 attempts_data = job_with_attempts.get("attempts", []) 354 355 return [ 356 SyncAttempt( 357 workspace=self.workspace, 358 connection=self.connection, 359 job_id=self.job_id, 360 attempt_number=i, 361 _attempt_data=attempt_data, 362 ) 363 for i, attempt_data in enumerate(attempts_data, start=0) 364 ] 365 366 def raise_failure_status( 367 self, 368 *, 369 refresh_status: bool = False, 370 ) -> None: 371 """Raise an exception if the sync job failed. 372 373 By default, this method will use the latest status available. If you want to refresh the 374 status before checking for failure, set `refresh_status=True`. If the job has failed, this 375 method will raise a `AirbyteConnectionSyncError`. 376 377 Otherwise, do nothing. 378 """ 379 if not refresh_status and self._latest_job_info: 380 latest_status = self._latest_job_info.status 381 else: 382 latest_status = self.get_job_status() 383 384 if latest_status in FAILED_STATUSES: 385 raise AirbyteConnectionSyncError( 386 workspace=self.workspace, 387 connection_id=self.connection.connection_id, 388 job_id=self.job_id, 389 job_status=self.get_job_status(), 390 ) 391 392 def wait_for_completion( 393 self, 394 *, 395 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 396 raise_timeout: bool = True, 397 raise_failure: bool = False, 398 ) -> JobStatusEnum: 399 """Wait for a job to finish running.""" 400 start_time = time.time() 401 while True: 402 latest_status = self.get_job_status() 403 if latest_status in FINAL_STATUSES: 404 if raise_failure: 405 # No-op if the job succeeded or is still running: 406 self.raise_failure_status() 407 408 return latest_status 409 410 if time.time() - start_time > wait_timeout: 411 if raise_timeout: 412 raise AirbyteConnectionSyncTimeoutError( 413 workspace=self.workspace, 414 connection_id=self.connection.connection_id, 415 job_id=self.job_id, 416 job_status=latest_status, 417 timeout=wait_timeout, 418 ) 419 420 return latest_status # This will be a non-final status 421 422 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS) 423 424 def get_sql_cache(self) -> CacheBase: 425 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 426 if self._cache: 427 return self._cache 428 429 destination_configuration = self._get_destination_configuration() 430 self._cache = destination_to_cache(destination_configuration=destination_configuration) 431 return self._cache 432 433 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 434 """Return a SQL Engine for querying a SQL-based destination.""" 435 return self.get_sql_cache().get_sql_engine() 436 437 def get_sql_table_name(self, stream_name: str) -> str: 438 """Return the SQL table name of the named stream.""" 439 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name) 440 441 def get_sql_table( 442 self, 443 stream_name: str, 444 ) -> sqlalchemy.Table: 445 """Return a SQLAlchemy table object for the named stream.""" 446 return self.get_sql_cache().processor.get_sql_table(stream_name) 447 448 def get_dataset(self, stream_name: str) -> CachedDataset: 449 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 450 451 This can be used to read and analyze the data in a SQL-based destination. 452 453 TODO: In a future iteration, we can consider providing stream configuration information 454 (catalog information) to the `CachedDataset` object via the "Get stream properties" 455 API: https://reference.airbyte.com/reference/getstreamproperties 456 """ 457 return CachedDataset( 458 self.get_sql_cache(), 459 stream_name=stream_name, 460 stream_configuration=False, # Don't look for stream configuration in cache. 461 ) 462 463 def get_sql_database_name(self) -> str: 464 """Return the SQL database name.""" 465 cache = self.get_sql_cache() 466 return cache.get_database_name() 467 468 def get_sql_schema_name(self) -> str: 469 """Return the SQL schema name.""" 470 cache = self.get_sql_cache() 471 return cache.schema_name 472 473 @property 474 def stream_names(self) -> list[str]: 475 """Return the set of stream names.""" 476 return self.connection.stream_names 477 478 @final 479 @property 480 def streams( 481 self, 482 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 483 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 484 485 This is a convenience wrapper around the `stream_names` 486 property and `get_dataset()` method. 487 """ 488 return self._SyncResultStreams(self) 489 490 class _SyncResultStreams(Mapping[str, CachedDataset]): 491 """A mapping of stream names to cached datasets.""" 492 493 def __init__( 494 self, 495 parent: SyncResult, 496 /, 497 ) -> None: 498 self.parent: SyncResult = parent 499 500 def __getitem__(self, key: str) -> CachedDataset: 501 return self.parent.get_dataset(stream_name=key) 502 503 def __iter__(self) -> Iterator[str]: 504 return iter(self.parent.stream_names) 505 506 def __len__(self) -> int: 507 return len(self.parent.stream_names)
The result of a sync operation.
This class is not meant to be instantiated directly. Instead, obtain a SyncResult by
interacting with the .CloudWorkspace and .CloudConnection objects.
236 @property 237 def job_url(self) -> str: 238 """Return the URL of the sync job. 239 240 Note: This currently returns the connection's job history URL, as there is no direct URL 241 to a specific job in the Airbyte Cloud web app. 242 243 TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. 244 E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true 245 """ 246 return f"{self.connection.job_history_url}"
Return the URL of the sync job.
Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.
TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
277 def is_job_complete(self) -> bool: 278 """Check if the sync job is complete.""" 279 return self.get_job_status() in FINAL_STATUSES
Check if the sync job is complete.
281 def get_job_status(self) -> JobStatusEnum: 282 """Check if the sync job is still running.""" 283 return self._fetch_latest_job_info().status
Check if the sync job is still running.
301 @property 302 def bytes_synced(self) -> int: 303 """Return the number of records processed.""" 304 return self._fetch_latest_job_info().bytes_synced or 0
Return the number of records processed.
306 @property 307 def records_synced(self) -> int: 308 """Return the number of records processed.""" 309 return self._fetch_latest_job_info().rows_synced or 0
Return the number of records processed.
311 @property 312 def start_time(self) -> datetime: 313 """Return the start time of the sync job in UTC.""" 314 try: 315 return ab_datetime_parse(self._fetch_latest_job_info().start_time) 316 except (ValueError, TypeError) as e: 317 if "Invalid isoformat string" in str(e): 318 job_info_raw = api_util._make_config_api_request( # noqa: SLF001 319 api_root=self.workspace.api_root, 320 config_api_root=self.workspace.config_api_root, 321 path="/jobs/get", 322 json={"id": self.job_id}, 323 client_id=self.workspace.client_id, 324 client_secret=self.workspace.client_secret, 325 bearer_token=self.workspace.bearer_token, 326 ) 327 raw_start_time = job_info_raw.get("startTime") 328 if raw_start_time: 329 return ab_datetime_parse(raw_start_time) 330 raise
Return the start time of the sync job in UTC.
350 def get_attempts(self) -> list[SyncAttempt]: 351 """Return a list of attempts for this sync job.""" 352 job_with_attempts = self._fetch_job_with_attempts() 353 attempts_data = job_with_attempts.get("attempts", []) 354 355 return [ 356 SyncAttempt( 357 workspace=self.workspace, 358 connection=self.connection, 359 job_id=self.job_id, 360 attempt_number=i, 361 _attempt_data=attempt_data, 362 ) 363 for i, attempt_data in enumerate(attempts_data, start=0) 364 ]
Return a list of attempts for this sync job.
366 def raise_failure_status( 367 self, 368 *, 369 refresh_status: bool = False, 370 ) -> None: 371 """Raise an exception if the sync job failed. 372 373 By default, this method will use the latest status available. If you want to refresh the 374 status before checking for failure, set `refresh_status=True`. If the job has failed, this 375 method will raise a `AirbyteConnectionSyncError`. 376 377 Otherwise, do nothing. 378 """ 379 if not refresh_status and self._latest_job_info: 380 latest_status = self._latest_job_info.status 381 else: 382 latest_status = self.get_job_status() 383 384 if latest_status in FAILED_STATUSES: 385 raise AirbyteConnectionSyncError( 386 workspace=self.workspace, 387 connection_id=self.connection.connection_id, 388 job_id=self.job_id, 389 job_status=self.get_job_status(), 390 )
Raise an exception if the sync job failed.
By default, this method will use the latest status available. If you want to refresh the
status before checking for failure, set refresh_status=True. If the job has failed, this
method will raise a AirbyteConnectionSyncError.
Otherwise, do nothing.
392 def wait_for_completion( 393 self, 394 *, 395 wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS, 396 raise_timeout: bool = True, 397 raise_failure: bool = False, 398 ) -> JobStatusEnum: 399 """Wait for a job to finish running.""" 400 start_time = time.time() 401 while True: 402 latest_status = self.get_job_status() 403 if latest_status in FINAL_STATUSES: 404 if raise_failure: 405 # No-op if the job succeeded or is still running: 406 self.raise_failure_status() 407 408 return latest_status 409 410 if time.time() - start_time > wait_timeout: 411 if raise_timeout: 412 raise AirbyteConnectionSyncTimeoutError( 413 workspace=self.workspace, 414 connection_id=self.connection.connection_id, 415 job_id=self.job_id, 416 job_status=latest_status, 417 timeout=wait_timeout, 418 ) 419 420 return latest_status # This will be a non-final status 421 422 time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
Wait for a job to finish running.
424 def get_sql_cache(self) -> CacheBase: 425 """Return a SQL Cache object for working with the data in a SQL-based destination's.""" 426 if self._cache: 427 return self._cache 428 429 destination_configuration = self._get_destination_configuration() 430 self._cache = destination_to_cache(destination_configuration=destination_configuration) 431 return self._cache
Return a SQL Cache object for working with the data in a SQL-based destination's.
433 def get_sql_engine(self) -> sqlalchemy.engine.Engine: 434 """Return a SQL Engine for querying a SQL-based destination.""" 435 return self.get_sql_cache().get_sql_engine()
Return a SQL Engine for querying a SQL-based destination.
437 def get_sql_table_name(self, stream_name: str) -> str: 438 """Return the SQL table name of the named stream.""" 439 return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
Return the SQL table name of the named stream.
441 def get_sql_table( 442 self, 443 stream_name: str, 444 ) -> sqlalchemy.Table: 445 """Return a SQLAlchemy table object for the named stream.""" 446 return self.get_sql_cache().processor.get_sql_table(stream_name)
Return a SQLAlchemy table object for the named stream.
448 def get_dataset(self, stream_name: str) -> CachedDataset: 449 """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name. 450 451 This can be used to read and analyze the data in a SQL-based destination. 452 453 TODO: In a future iteration, we can consider providing stream configuration information 454 (catalog information) to the `CachedDataset` object via the "Get stream properties" 455 API: https://reference.airbyte.com/reference/getstreamproperties 456 """ 457 return CachedDataset( 458 self.get_sql_cache(), 459 stream_name=stream_name, 460 stream_configuration=False, # Don't look for stream configuration in cache. 461 )
Retrieve an airbyte.datasets.CachedDataset object for a given stream name.
This can be used to read and analyze the data in a SQL-based destination.
TODO: In a future iteration, we can consider providing stream configuration information
(catalog information) to the CachedDataset object via the "Get stream properties"
API: https://reference.airbyte.com/reference/getstreamproperties
463 def get_sql_database_name(self) -> str: 464 """Return the SQL database name.""" 465 cache = self.get_sql_cache() 466 return cache.get_database_name()
Return the SQL database name.
468 def get_sql_schema_name(self) -> str: 469 """Return the SQL schema name.""" 470 cache = self.get_sql_cache() 471 return cache.schema_name
Return the SQL schema name.
473 @property 474 def stream_names(self) -> list[str]: 475 """Return the set of stream names.""" 476 return self.connection.stream_names
Return the set of stream names.
478 @final 479 @property 480 def streams( 481 self, 482 ) -> _SyncResultStreams: # pyrefly: ignore[unknown-name] 483 """Return a mapping of stream names to `airbyte.CachedDataset` objects. 484 485 This is a convenience wrapper around the `stream_names` 486 property and `get_dataset()` method. 487 """ 488 return self._SyncResultStreams(self)
Return a mapping of stream names to airbyte.CachedDataset objects.
This is a convenience wrapper around the stream_names
property and get_dataset() method.
58class JobStatusEnum(str, Enum): 59 """Status values for an Airbyte Cloud job.""" 60 61 PENDING = "pending" 62 RUNNING = "running" 63 INCOMPLETE = "incomplete" 64 FAILED = "failed" 65 SUCCEEDED = "succeeded" 66 CANCELLED = "cancelled"
Status values for an Airbyte Cloud job.
69class JobTypeEnum(str, Enum): 70 """Job type values for Airbyte Cloud jobs.""" 71 72 SYNC = "sync" 73 RESET = "reset" 74 REFRESH = "refresh" 75 CLEAR = "clear"
Job type values for Airbyte Cloud jobs.