airbyte.cloud

PyAirbyte classes and methods for interacting with the Airbyte Cloud API.

You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.

Self-managed Airbyte instances

For self-managed Airbyte instances, set api_root to the Public API root for your deployment. For the default self-managed route, that usually ends in /api/public/v1. PyAirbyte uses the Public API for workspace and organization discovery.

Some Cloud module methods also call the Config API, including methods such as CloudConnection.dump_raw_catalog(), which reads the configured catalog directly from Airbyte. For documented self-managed deployments where the Public API root ends in /api/public/v1, PyAirbyte infers the Config API root by replacing that suffix with /api/v1.

If your deployment uses custom ingress or a nonstandard reverse proxy, pass config_api_root explicitly or set the AIRBYTE_CLOUD_CONFIG_API_URL environment variable.

from airbyte import cloud

workspace = cloud.CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
    api_root="https://airbyte.example.com/api/public/v1",
    config_api_root="https://airbyte.example.com/api/v1",
)

connection = workspace.get_connection(connection_id="...")
raw_catalog = connection.dump_raw_catalog()

Examples

Basic Sync Example:

import airbyte as ab
from airbyte import cloud

# Initialize an Airbyte Cloud workspace object
workspace = cloud.CloudWorkspace(
    workspace_id="123",
    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
)

# Run a sync job on Airbyte Cloud
connection = workspace.get_connection(connection_id="456")
sync_result = connection.run_sync()
print(sync_result.get_job_status())

Example Read From Cloud Destination:

If your destination is supported, you can read records directly from the SyncResult object. Currently this is supported in Snowflake and BigQuery only.

# Assuming we've already created a `connection` object...

# Get the latest job result and print the stream names
sync_result = connection.get_sync_result()
print(sync_result.stream_names)

# Get a dataset from the sync result
dataset: CachedDataset = sync_result.get_dataset("users")

# Get a SQLAlchemy table to use in SQL queries...
users_table = dataset.to_sql_table()
print(f"Table name: {users_table.name}")

# Or iterate over the dataset directly
for record in dataset:
    print(record)
  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte classes and methods for interacting with the Airbyte Cloud API.
  3
  4You can use this module to interact with Airbyte Cloud, OSS, and Enterprise.
  5
  6## Self-managed Airbyte instances
  7
  8For self-managed Airbyte instances, set `api_root` to the Public API root for your
  9deployment. For the default self-managed route, that usually ends in `/api/public/v1`.
 10PyAirbyte uses the Public API for workspace and organization discovery.
 11
 12Some Cloud module methods also call the Config API, including methods such as
 13`CloudConnection.dump_raw_catalog()`, which reads the configured catalog directly
 14from Airbyte. For documented self-managed deployments where the Public API root ends in
 15`/api/public/v1`, PyAirbyte infers the Config API root by replacing that suffix with
 16`/api/v1`.
 17
 18If your deployment uses custom ingress or a nonstandard reverse proxy, pass
 19`config_api_root` explicitly or set the `AIRBYTE_CLOUD_CONFIG_API_URL` environment
 20variable.
 21
 22```python
 23from airbyte import cloud
 24
 25workspace = cloud.CloudWorkspace(
 26    workspace_id="...",
 27    client_id="...",
 28    client_secret="...",
 29    api_root="https://airbyte.example.com/api/public/v1",
 30    config_api_root="https://airbyte.example.com/api/v1",
 31)
 32
 33connection = workspace.get_connection(connection_id="...")
 34raw_catalog = connection.dump_raw_catalog()
 35```
 36
 37## Examples
 38
 39### Basic Sync Example:
 40
 41```python
 42import airbyte as ab
 43from airbyte import cloud
 44
 45# Initialize an Airbyte Cloud workspace object
 46workspace = cloud.CloudWorkspace(
 47    workspace_id="123",
 48    api_key=ab.get_secret("AIRBYTE_CLOUD_API_KEY"),
 49)
 50
 51# Run a sync job on Airbyte Cloud
 52connection = workspace.get_connection(connection_id="456")
 53sync_result = connection.run_sync()
 54print(sync_result.get_job_status())
 55```
 56
 57### Example Read From Cloud Destination:
 58
 59If your destination is supported, you can read records directly from the
 60`SyncResult` object. Currently this is supported in Snowflake and BigQuery only.
 61
 62
 63```python
 64# Assuming we've already created a `connection` object...
 65
 66# Get the latest job result and print the stream names
 67sync_result = connection.get_sync_result()
 68print(sync_result.stream_names)
 69
 70# Get a dataset from the sync result
 71dataset: CachedDataset = sync_result.get_dataset("users")
 72
 73# Get a SQLAlchemy table to use in SQL queries...
 74users_table = dataset.to_sql_table()
 75print(f"Table name: {users_table.name}")
 76
 77# Or iterate over the dataset directly
 78for record in dataset:
 79    print(record)
 80```
 81"""
 82
 83from __future__ import annotations
 84
 85from typing import TYPE_CHECKING
 86
 87from airbyte.cloud.client import CloudClient
 88from airbyte.cloud.client_config import CloudClientConfig
 89from airbyte.cloud.connections import CloudConnection
 90from airbyte.cloud.models import CloudWorkspaceInfo, JobStatusEnum, JobTypeEnum
 91from airbyte.cloud.organizations import CloudOrganization
 92from airbyte.cloud.sync_results import SyncResult
 93from airbyte.cloud.workspaces import CloudWorkspace
 94
 95
 96# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757
 97if TYPE_CHECKING:
 98    # ruff: noqa: TC004
 99    from airbyte.cloud import (
100        client,
101        client_config,
102        connections,
103        constants,
104        organizations,
105        sync_results,
106        workspaces,
107    )
108
109
110__all__ = [
111    # Submodules
112    "workspaces",
113    "client",
114    "organizations",
115    "connections",
116    "constants",
117    "client_config",
118    "sync_results",
119    # Classes
120    "CloudClient",
121    "CloudOrganization",
122    "CloudWorkspace",
123    "CloudConnection",
124    "CloudClientConfig",
125    "CloudWorkspaceInfo",
126    "SyncResult",
127    # Enums
128    "JobStatusEnum",
129    "JobTypeEnum",
130]
@dataclass(init=False, kw_only=True)
class CloudClient:
 25@dataclass(init=False, kw_only=True)
 26class CloudClient:
 27    """Authenticated client for Airbyte Cloud and self-managed Airbyte APIs."""
 28
 29    _credentials: _AirbyteCredentials
 30
 31    def __init__(
 32        self,
 33        *,
 34        client_id: str | SecretString | None = None,
 35        client_secret: str | SecretString | None = None,
 36        bearer_token: str | SecretString | None = None,
 37        public_api_root: str | None = None,
 38        config_api_root: str | None = None,
 39        workspace_id: str | None = None,
 40        organization_id: str | None = None,
 41    ) -> None:
 42        """Initialize a `CloudClient` from explicit auth values."""
 43        self._credentials = _AirbyteCredentials.from_auth(
 44            client_id=client_id,
 45            client_secret=client_secret,
 46            bearer_token=bearer_token,
 47            public_api_root=public_api_root,
 48            config_api_root=config_api_root,
 49            workspace_id=workspace_id,
 50            organization_id=organization_id,
 51            env_vars=False,
 52        )
 53
 54    @property
 55    def client_id(self) -> SecretString | None:
 56        """OAuth client ID used for authentication."""
 57        return self._credentials.client_id
 58
 59    @property
 60    def client_secret(self) -> SecretString | None:
 61        """OAuth client secret used for authentication."""
 62        return self._credentials.client_secret
 63
 64    @property
 65    def bearer_token(self) -> SecretString | None:
 66        """Bearer token used for authentication."""
 67        return self._credentials.bearer_token
 68
 69    @property
 70    def public_api_root(self) -> str:
 71        """Airbyte Public API root."""
 72        return self._credentials.public_api_root
 73
 74    @property
 75    def config_api_root(self) -> str | None:
 76        """Airbyte Config API root."""
 77        return self._credentials.config_api_root
 78
 79    @property
 80    def organization_id(self) -> str | None:
 81        """Default organization ID for organization-scoped operations."""
 82        return self._credentials.organization_id
 83
 84    @classmethod
 85    def from_auth(
 86        cls,
 87        *,
 88        env_vars: bool = False,
 89        organization_id: str | None = None,
 90        client_id: str | SecretString | None = None,
 91        client_secret: str | SecretString | None = None,
 92        bearer_token: str | SecretString | None = None,
 93        public_api_root: str | None = None,
 94        config_api_root: str | None = None,
 95    ) -> CloudClient:
 96        """Create a client from explicit inputs and optionally environment variables.
 97
 98        When `env_vars` is True, environment variables are checked as a fallback
 99        after any explicitly provided values.
100        """
101        credentials = _AirbyteCredentials.from_auth(
102            organization_id=organization_id,
103            client_id=client_id,
104            client_secret=client_secret,
105            bearer_token=bearer_token,
106            public_api_root=public_api_root,
107            config_api_root=config_api_root,
108            env_vars=env_vars,
109        )
110        return cls._from_credentials(credentials)
111
112    @classmethod
113    def _from_credentials(cls, credentials: _AirbyteCredentials) -> CloudClient:
114        """Create a client from resolved Cloud credentials."""
115        return cls(
116            client_id=credentials.client_id,
117            client_secret=credentials.client_secret,
118            bearer_token=credentials.bearer_token,
119            public_api_root=credentials.public_api_root,
120            config_api_root=credentials.config_api_root,
121            workspace_id=credentials.workspace_id,
122            organization_id=credentials.organization_id,
123        )
124
125    def get_workspace(self, workspace_id: str | None = None) -> CloudWorkspace:
126        """Create a `CloudWorkspace` using this client's credentials."""
127        resolved_workspace_id = workspace_id or self._credentials.workspace_id
128        if not resolved_workspace_id:
129            raise exc.PyAirbyteInputError(
130                message="Workspace ID is required.",
131                guidance="Provide a workspace ID.",
132            )
133
134        credentials = self._credentials.with_workspace_id(resolved_workspace_id)
135        return CloudWorkspace(
136            workspace_id=credentials.workspace_id,
137            client_id=credentials.client_id,
138            client_secret=credentials.client_secret,
139            bearer_token=credentials.bearer_token,
140            api_root=credentials.public_api_root,
141            config_api_root=credentials.config_api_root,
142        )
143
144    def create_workspace(
145        self,
146        *,
147        name: str,
148        organization_id: str | None = None,
149        region_id: str | None = None,
150    ) -> CloudWorkspaceInfo:
151        """Create an Airbyte workspace."""
152        resolved_organization_id = organization_id or self.organization_id
153        workspace = api_util.create_workspace(
154            name=name,
155            organization_id=resolved_organization_id,
156            region_id=region_id,
157            api_root=self.public_api_root,
158            client_id=self.client_id,
159            client_secret=self.client_secret,
160            bearer_token=self.bearer_token,
161        )
162        return CloudWorkspaceInfo.from_api_response(workspace)
163
164    def rename_workspace(
165        self,
166        workspace_id: str,
167        *,
168        name: str,
169    ) -> CloudWorkspaceInfo:
170        """Rename an Airbyte workspace."""
171        workspace = api_util.rename_workspace(
172            workspace_id=workspace_id,
173            name=name,
174            api_root=self.public_api_root,
175            client_id=self.client_id,
176            client_secret=self.client_secret,
177            bearer_token=self.bearer_token,
178        )
179        return CloudWorkspaceInfo.from_api_response(workspace)
180
181    def permanently_delete_workspace(
182        self,
183        workspace_id: str,
184        *,
185        workspace_name: str | None = None,
186        safe_mode: bool = True,
187    ) -> None:
188        """Permanently delete an Airbyte workspace if it has no connections.
189
190        When `safe_mode` is enabled, the workspace name must contain `delete-me`
191        or `deleteme`. This also checks for existing connections before deleting
192        and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty.
193        """
194        api_util.permanently_delete_workspace(
195            workspace_id=workspace_id,
196            workspace_name=workspace_name,
197            api_root=self.public_api_root,
198            client_id=self.client_id,
199            client_secret=self.client_secret,
200            bearer_token=self.bearer_token,
201            safe_mode=safe_mode,
202        )
203
204    @overload
205    def list_workspaces(
206        self,
207        name: str | None = None,
208        *,
209        organization_id: None = None,
210        name_contains: str | None = None,
211        name_filter: Callable[[str], bool] | None = None,
212        limit: int | None = None,
213    ) -> list[CloudWorkspaceInfo]:
214        raise NotImplementedError
215
216    @overload
217    def list_workspaces(
218        self,
219        name: str | None = None,
220        *,
221        organization_id: str,
222        name_contains: str | None = None,
223        name_filter: Callable[[str], bool] | None = None,
224        limit: int | None = None,
225    ) -> list[CloudWorkspaceInfo]:
226        raise NotImplementedError
227
228    def list_workspaces(
229        self,
230        name: str | None = None,
231        *,
232        organization_id: str | None = None,
233        name_contains: str | None = None,
234        name_filter: Callable[[str], bool] | None = None,
235        limit: int | None = None,
236    ) -> list[CloudWorkspaceInfo]:
237        """List workspaces available to this client."""
238        if organization_id is not None or self.organization_id is not None:
239            resolved_organization_id = organization_id or self.organization_id
240            if not resolved_organization_id:
241                raise exc.PyAirbyteInputError(
242                    message="Organization ID is required.",
243                    guidance="Provide an organization ID.",
244                )
245            workspaces = api_util.list_workspaces_in_organization(
246                organization_id=resolved_organization_id,
247                api_root=self.public_api_root,
248                config_api_root=self.config_api_root,
249                client_id=self.client_id,
250                client_secret=self.client_secret,
251                bearer_token=self.bearer_token,
252                name_contains=name_contains or name,
253                limit=None if name_filter is not None else limit,
254            )
255            workspace_infos = [
256                CloudWorkspaceInfo.from_mapping(workspace) for workspace in workspaces
257            ]
258            if name_filter is not None:
259                workspace_infos = [
260                    workspace for workspace in workspace_infos if name_filter(workspace.name)
261                ]
262                if limit is not None:
263                    workspace_infos = workspace_infos[:limit]
264            return workspace_infos
265        if name_contains is not None:
266            if name_filter is not None:
267                raise exc.PyAirbyteInputError(
268                    message="You can provide name_contains or name_filter, but not both."
269                )
270            name_substring = name_contains
271
272            def name_filter(workspace_name: str) -> bool:
273                return name_substring in workspace_name
274
275        return [
276            CloudWorkspaceInfo.from_api_response(workspace)
277            for workspace in api_util.list_workspaces(
278                workspace_id="",
279                api_root=self.public_api_root,
280                name=name,
281                name_filter=name_filter,
282                client_id=self.client_id,
283                client_secret=self.client_secret,
284                bearer_token=self.bearer_token,
285                limit=limit,
286            )
287        ]
288
289    def get_organization(
290        self,
291        organization_id: str | None = None,
292        *,
293        organization_name: str | None = None,
294    ) -> CloudOrganization:
295        """Resolve an organization by ID or exact name."""
296        resolved_organization_id = organization_id or self.organization_id
297        if resolved_organization_id and organization_name:
298            raise exc.PyAirbyteInputError(
299                message="Provide either organization ID or organization name."
300            )
301        if not resolved_organization_id and not organization_name:
302            raise exc.PyAirbyteInputError(
303                message="Organization ID or organization name is required."
304            )
305
306        organizations = api_util.list_organizations_for_user(
307            api_root=self.public_api_root,
308            client_id=self.client_id,
309            client_secret=self.client_secret,
310            bearer_token=self.bearer_token,
311        )
312        if resolved_organization_id:
313            matching_organizations = [
314                organization
315                for organization in organizations
316                if organization.organization_id == resolved_organization_id
317            ]
318        else:
319            matching_organizations = [
320                organization
321                for organization in organizations
322                if organization.organization_name == organization_name
323            ]
324
325        if not matching_organizations:
326            raise AirbyteMissingResourceError(
327                resource_type="organization",
328                resource_name_or_id=resolved_organization_id or organization_name,
329            )
330        if len(matching_organizations) > 1:
331            raise exc.PyAirbyteInputError(
332                message="Organization name matches multiple organizations.",
333                context={"organization_name": organization_name},
334            )
335
336        organization = matching_organizations[0]
337
338        organization_credentials = self._credentials.with_organization_id(
339            organization.organization_id
340        )
341        return CloudOrganization(
342            organization_id=organization.organization_id,
343            organization_name=organization.organization_name,
344            email=organization.email,
345            client_id=organization_credentials.client_id,
346            client_secret=organization_credentials.client_secret,
347            bearer_token=organization_credentials.bearer_token,
348            public_api_root=organization_credentials.public_api_root,
349            config_api_root=organization_credentials.config_api_root,
350        )

Authenticated client for Airbyte Cloud and self-managed Airbyte APIs.

CloudClient( *, client_id: str | airbyte.secrets.SecretString | None = None, client_secret: str | airbyte.secrets.SecretString | None = None, bearer_token: str | airbyte.secrets.SecretString | None = None, public_api_root: str | None = None, config_api_root: str | None = None, workspace_id: str | None = None, organization_id: str | None = None)
31    def __init__(
32        self,
33        *,
34        client_id: str | SecretString | None = None,
35        client_secret: str | SecretString | None = None,
36        bearer_token: str | SecretString | None = None,
37        public_api_root: str | None = None,
38        config_api_root: str | None = None,
39        workspace_id: str | None = None,
40        organization_id: str | None = None,
41    ) -> None:
42        """Initialize a `CloudClient` from explicit auth values."""
43        self._credentials = _AirbyteCredentials.from_auth(
44            client_id=client_id,
45            client_secret=client_secret,
46            bearer_token=bearer_token,
47            public_api_root=public_api_root,
48            config_api_root=config_api_root,
49            workspace_id=workspace_id,
50            organization_id=organization_id,
51            env_vars=False,
52        )

Initialize a CloudClient from explicit auth values.

client_id: airbyte.secrets.SecretString | None
54    @property
55    def client_id(self) -> SecretString | None:
56        """OAuth client ID used for authentication."""
57        return self._credentials.client_id

OAuth client ID used for authentication.

client_secret: airbyte.secrets.SecretString | None
59    @property
60    def client_secret(self) -> SecretString | None:
61        """OAuth client secret used for authentication."""
62        return self._credentials.client_secret

OAuth client secret used for authentication.

bearer_token: airbyte.secrets.SecretString | None
64    @property
65    def bearer_token(self) -> SecretString | None:
66        """Bearer token used for authentication."""
67        return self._credentials.bearer_token

Bearer token used for authentication.

public_api_root: str
69    @property
70    def public_api_root(self) -> str:
71        """Airbyte Public API root."""
72        return self._credentials.public_api_root

Airbyte Public API root.

config_api_root: str | None
74    @property
75    def config_api_root(self) -> str | None:
76        """Airbyte Config API root."""
77        return self._credentials.config_api_root

Airbyte Config API root.

organization_id: str | None
79    @property
80    def organization_id(self) -> str | None:
81        """Default organization ID for organization-scoped operations."""
82        return self._credentials.organization_id

Default organization ID for organization-scoped operations.

@classmethod
def from_auth( cls, *, env_vars: bool = False, organization_id: str | None = None, client_id: str | airbyte.secrets.SecretString | None = None, client_secret: str | airbyte.secrets.SecretString | None = None, bearer_token: str | airbyte.secrets.SecretString | None = None, public_api_root: str | None = None, config_api_root: str | None = None) -> CloudClient:
 84    @classmethod
 85    def from_auth(
 86        cls,
 87        *,
 88        env_vars: bool = False,
 89        organization_id: str | None = None,
 90        client_id: str | SecretString | None = None,
 91        client_secret: str | SecretString | None = None,
 92        bearer_token: str | SecretString | None = None,
 93        public_api_root: str | None = None,
 94        config_api_root: str | None = None,
 95    ) -> CloudClient:
 96        """Create a client from explicit inputs and optionally environment variables.
 97
 98        When `env_vars` is True, environment variables are checked as a fallback
 99        after any explicitly provided values.
100        """
101        credentials = _AirbyteCredentials.from_auth(
102            organization_id=organization_id,
103            client_id=client_id,
104            client_secret=client_secret,
105            bearer_token=bearer_token,
106            public_api_root=public_api_root,
107            config_api_root=config_api_root,
108            env_vars=env_vars,
109        )
110        return cls._from_credentials(credentials)

Create a client from explicit inputs and optionally environment variables.

When env_vars is True, environment variables are checked as a fallback after any explicitly provided values.

def get_workspace( self, workspace_id: str | None = None) -> CloudWorkspace:
125    def get_workspace(self, workspace_id: str | None = None) -> CloudWorkspace:
126        """Create a `CloudWorkspace` using this client's credentials."""
127        resolved_workspace_id = workspace_id or self._credentials.workspace_id
128        if not resolved_workspace_id:
129            raise exc.PyAirbyteInputError(
130                message="Workspace ID is required.",
131                guidance="Provide a workspace ID.",
132            )
133
134        credentials = self._credentials.with_workspace_id(resolved_workspace_id)
135        return CloudWorkspace(
136            workspace_id=credentials.workspace_id,
137            client_id=credentials.client_id,
138            client_secret=credentials.client_secret,
139            bearer_token=credentials.bearer_token,
140            api_root=credentials.public_api_root,
141            config_api_root=credentials.config_api_root,
142        )

Create a CloudWorkspace using this client's credentials.

def create_workspace( self, *, name: str, organization_id: str | None = None, region_id: str | None = None) -> CloudWorkspaceInfo:
144    def create_workspace(
145        self,
146        *,
147        name: str,
148        organization_id: str | None = None,
149        region_id: str | None = None,
150    ) -> CloudWorkspaceInfo:
151        """Create an Airbyte workspace."""
152        resolved_organization_id = organization_id or self.organization_id
153        workspace = api_util.create_workspace(
154            name=name,
155            organization_id=resolved_organization_id,
156            region_id=region_id,
157            api_root=self.public_api_root,
158            client_id=self.client_id,
159            client_secret=self.client_secret,
160            bearer_token=self.bearer_token,
161        )
162        return CloudWorkspaceInfo.from_api_response(workspace)

Create an Airbyte workspace.

def rename_workspace( self, workspace_id: str, *, name: str) -> CloudWorkspaceInfo:
164    def rename_workspace(
165        self,
166        workspace_id: str,
167        *,
168        name: str,
169    ) -> CloudWorkspaceInfo:
170        """Rename an Airbyte workspace."""
171        workspace = api_util.rename_workspace(
172            workspace_id=workspace_id,
173            name=name,
174            api_root=self.public_api_root,
175            client_id=self.client_id,
176            client_secret=self.client_secret,
177            bearer_token=self.bearer_token,
178        )
179        return CloudWorkspaceInfo.from_api_response(workspace)

Rename an Airbyte workspace.

def permanently_delete_workspace( self, workspace_id: str, *, workspace_name: str | None = None, safe_mode: bool = True) -> None:
181    def permanently_delete_workspace(
182        self,
183        workspace_id: str,
184        *,
185        workspace_name: str | None = None,
186        safe_mode: bool = True,
187    ) -> None:
188        """Permanently delete an Airbyte workspace if it has no connections.
189
190        When `safe_mode` is enabled, the workspace name must contain `delete-me`
191        or `deleteme`. This also checks for existing connections before deleting
192        and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty.
193        """
194        api_util.permanently_delete_workspace(
195            workspace_id=workspace_id,
196            workspace_name=workspace_name,
197            api_root=self.public_api_root,
198            client_id=self.client_id,
199            client_secret=self.client_secret,
200            bearer_token=self.bearer_token,
201            safe_mode=safe_mode,
202        )

Permanently delete an Airbyte workspace if it has no connections.

When safe_mode is enabled, the workspace name must contain delete-me or deleteme. This also checks for existing connections before deleting and raises AirbyteWorkspaceNotEmptyError if the workspace is not empty.

def list_workspaces( self, name: str | None = None, *, organization_id: str | None = None, name_contains: str | None = None, name_filter: Callable[[str], bool] | None = None, limit: int | None = None) -> list[CloudWorkspaceInfo]:
228    def list_workspaces(
229        self,
230        name: str | None = None,
231        *,
232        organization_id: str | None = None,
233        name_contains: str | None = None,
234        name_filter: Callable[[str], bool] | None = None,
235        limit: int | None = None,
236    ) -> list[CloudWorkspaceInfo]:
237        """List workspaces available to this client."""
238        if organization_id is not None or self.organization_id is not None:
239            resolved_organization_id = organization_id or self.organization_id
240            if not resolved_organization_id:
241                raise exc.PyAirbyteInputError(
242                    message="Organization ID is required.",
243                    guidance="Provide an organization ID.",
244                )
245            workspaces = api_util.list_workspaces_in_organization(
246                organization_id=resolved_organization_id,
247                api_root=self.public_api_root,
248                config_api_root=self.config_api_root,
249                client_id=self.client_id,
250                client_secret=self.client_secret,
251                bearer_token=self.bearer_token,
252                name_contains=name_contains or name,
253                limit=None if name_filter is not None else limit,
254            )
255            workspace_infos = [
256                CloudWorkspaceInfo.from_mapping(workspace) for workspace in workspaces
257            ]
258            if name_filter is not None:
259                workspace_infos = [
260                    workspace for workspace in workspace_infos if name_filter(workspace.name)
261                ]
262                if limit is not None:
263                    workspace_infos = workspace_infos[:limit]
264            return workspace_infos
265        if name_contains is not None:
266            if name_filter is not None:
267                raise exc.PyAirbyteInputError(
268                    message="You can provide name_contains or name_filter, but not both."
269                )
270            name_substring = name_contains
271
272            def name_filter(workspace_name: str) -> bool:
273                return name_substring in workspace_name
274
275        return [
276            CloudWorkspaceInfo.from_api_response(workspace)
277            for workspace in api_util.list_workspaces(
278                workspace_id="",
279                api_root=self.public_api_root,
280                name=name,
281                name_filter=name_filter,
282                client_id=self.client_id,
283                client_secret=self.client_secret,
284                bearer_token=self.bearer_token,
285                limit=limit,
286            )
287        ]

List workspaces available to this client.

def get_organization( self, organization_id: str | None = None, *, organization_name: str | None = None) -> CloudOrganization:
289    def get_organization(
290        self,
291        organization_id: str | None = None,
292        *,
293        organization_name: str | None = None,
294    ) -> CloudOrganization:
295        """Resolve an organization by ID or exact name."""
296        resolved_organization_id = organization_id or self.organization_id
297        if resolved_organization_id and organization_name:
298            raise exc.PyAirbyteInputError(
299                message="Provide either organization ID or organization name."
300            )
301        if not resolved_organization_id and not organization_name:
302            raise exc.PyAirbyteInputError(
303                message="Organization ID or organization name is required."
304            )
305
306        organizations = api_util.list_organizations_for_user(
307            api_root=self.public_api_root,
308            client_id=self.client_id,
309            client_secret=self.client_secret,
310            bearer_token=self.bearer_token,
311        )
312        if resolved_organization_id:
313            matching_organizations = [
314                organization
315                for organization in organizations
316                if organization.organization_id == resolved_organization_id
317            ]
318        else:
319            matching_organizations = [
320                organization
321                for organization in organizations
322                if organization.organization_name == organization_name
323            ]
324
325        if not matching_organizations:
326            raise AirbyteMissingResourceError(
327                resource_type="organization",
328                resource_name_or_id=resolved_organization_id or organization_name,
329            )
330        if len(matching_organizations) > 1:
331            raise exc.PyAirbyteInputError(
332                message="Organization name matches multiple organizations.",
333                context={"organization_name": organization_name},
334            )
335
336        organization = matching_organizations[0]
337
338        organization_credentials = self._credentials.with_organization_id(
339            organization.organization_id
340        )
341        return CloudOrganization(
342            organization_id=organization.organization_id,
343            organization_name=organization.organization_name,
344            email=organization.email,
345            client_id=organization_credentials.client_id,
346            client_secret=organization_credentials.client_secret,
347            bearer_token=organization_credentials.bearer_token,
348            public_api_root=organization_credentials.public_api_root,
349            config_api_root=organization_credentials.config_api_root,
350        )

Resolve an organization by ID or exact name.

class CloudOrganization:
 18class CloudOrganization:
 19    """Information about an organization in Airbyte Cloud.
 20
 21    This class provides lazy loading of organization attributes including billing status.
 22    It is typically created via `CloudWorkspace.get_organization()`.
 23    """
 24
 25    def __init__(
 26        self,
 27        organization_id: str,
 28        organization_name: str | None = None,
 29        email: str | None = None,
 30        *,
 31        client_id: str | SecretString | None = None,
 32        client_secret: str | SecretString | None = None,
 33        bearer_token: str | SecretString | None = None,
 34        public_api_root: str | None = None,
 35        config_api_root: str | None = None,
 36    ) -> None:
 37        """Initialize a `CloudOrganization`."""
 38        self.organization_id = organization_id
 39        """The organization ID."""
 40
 41        self._organization_name = organization_name
 42        """Display name of the organization."""
 43
 44        self._email = email
 45        """Email associated with the organization."""
 46
 47        self._credentials = _AirbyteCredentials(
 48            client_id=SecretString(client_id) if client_id else None,
 49            client_secret=SecretString(client_secret) if client_secret else None,
 50            bearer_token=SecretString(bearer_token) if bearer_token else None,
 51            public_api_root=public_api_root or api_util.CLOUD_API_ROOT,
 52            config_api_root=config_api_root,
 53            organization_id=organization_id,
 54        )
 55        self._organization_info: dict[str, Any] | None = None
 56        self._organization_info_fetch_failed: bool = False
 57
 58    def _fetch_organization_info(self, *, force_refresh: bool = False) -> dict[str, Any]:
 59        """Fetch and cache organization info including billing status."""
 60        if force_refresh:
 61            self._organization_info_fetch_failed = False
 62
 63        if self._organization_info_fetch_failed and self._organization_info is None:
 64            return {}
 65
 66        if not force_refresh and self._organization_info is not None:
 67            return self._organization_info
 68
 69        try:
 70            self._organization_info = api_util.get_organization_info(
 71                organization_id=self.organization_id,
 72                api_root=self._credentials.public_api_root,
 73                config_api_root=self._credentials.config_api_root,
 74                client_id=self._credentials.client_id,
 75                client_secret=self._credentials.client_secret,
 76                bearer_token=self._credentials.bearer_token,
 77            )
 78        except Exception as ex:
 79            logger.debug("Failed to fetch organization info.", exc_info=ex)
 80            if self._organization_info is None:
 81                self._organization_info_fetch_failed = True
 82            return self._organization_info or {}
 83        else:
 84            return self._organization_info
 85
 86    @property
 87    def organization_name(self) -> str | None:
 88        """Display name of the organization."""
 89        if self._organization_name is not None:
 90            return self._organization_name
 91        info = self._fetch_organization_info()
 92        return info.get("organizationName")
 93
 94    @property
 95    def email(self) -> str | None:
 96        """Email associated with the organization."""
 97        if self._email is not None:
 98            return self._email
 99        info = self._fetch_organization_info()
100        return info.get("email")
101
102    @property
103    def payment_status(self) -> str | None:
104        """Payment status of the organization."""
105        info = self._fetch_organization_info()
106        return (info.get("billing") or {}).get("paymentStatus")
107
108    @property
109    def subscription_status(self) -> str | None:
110        """Subscription status of the organization."""
111        info = self._fetch_organization_info()
112        return (info.get("billing") or {}).get("subscriptionStatus")
113
114    @property
115    def is_account_locked(self) -> bool:
116        """Whether the account is locked due to billing issues."""
117        return api_util.is_account_locked(self.payment_status, self.subscription_status)

Information about an organization in Airbyte Cloud.

This class provides lazy loading of organization attributes including billing status. It is typically created via CloudWorkspace.get_organization().

CloudOrganization( organization_id: str, organization_name: str | None = None, email: str | None = None, *, client_id: str | airbyte.secrets.SecretString | None = None, client_secret: str | airbyte.secrets.SecretString | None = None, bearer_token: str | airbyte.secrets.SecretString | None = None, public_api_root: str | None = None, config_api_root: str | None = None)
25    def __init__(
26        self,
27        organization_id: str,
28        organization_name: str | None = None,
29        email: str | None = None,
30        *,
31        client_id: str | SecretString | None = None,
32        client_secret: str | SecretString | None = None,
33        bearer_token: str | SecretString | None = None,
34        public_api_root: str | None = None,
35        config_api_root: str | None = None,
36    ) -> None:
37        """Initialize a `CloudOrganization`."""
38        self.organization_id = organization_id
39        """The organization ID."""
40
41        self._organization_name = organization_name
42        """Display name of the organization."""
43
44        self._email = email
45        """Email associated with the organization."""
46
47        self._credentials = _AirbyteCredentials(
48            client_id=SecretString(client_id) if client_id else None,
49            client_secret=SecretString(client_secret) if client_secret else None,
50            bearer_token=SecretString(bearer_token) if bearer_token else None,
51            public_api_root=public_api_root or api_util.CLOUD_API_ROOT,
52            config_api_root=config_api_root,
53            organization_id=organization_id,
54        )
55        self._organization_info: dict[str, Any] | None = None
56        self._organization_info_fetch_failed: bool = False

Initialize a CloudOrganization.

organization_id

The organization ID.

organization_name: str | None
86    @property
87    def organization_name(self) -> str | None:
88        """Display name of the organization."""
89        if self._organization_name is not None:
90            return self._organization_name
91        info = self._fetch_organization_info()
92        return info.get("organizationName")

Display name of the organization.

email: str | None
 94    @property
 95    def email(self) -> str | None:
 96        """Email associated with the organization."""
 97        if self._email is not None:
 98            return self._email
 99        info = self._fetch_organization_info()
100        return info.get("email")

Email associated with the organization.

payment_status: str | None
102    @property
103    def payment_status(self) -> str | None:
104        """Payment status of the organization."""
105        info = self._fetch_organization_info()
106        return (info.get("billing") or {}).get("paymentStatus")

Payment status of the organization.

subscription_status: str | None
108    @property
109    def subscription_status(self) -> str | None:
110        """Subscription status of the organization."""
111        info = self._fetch_organization_info()
112        return (info.get("billing") or {}).get("subscriptionStatus")

Subscription status of the organization.

is_account_locked: bool
114    @property
115    def is_account_locked(self) -> bool:
116        """Whether the account is locked due to billing issues."""
117        return api_util.is_account_locked(self.payment_status, self.subscription_status)

Whether the account is locked due to billing issues.

@dataclass(init=False, kw_only=True)
class CloudWorkspace:
 70@dataclass(init=False, kw_only=True)  # noqa: PLR0904  # Core cloud API facade.
 71class CloudWorkspace:
 72    """A remote workspace on the Airbyte Cloud.
 73
 74    By overriding `api_root`, you can use this class to interact with self-managed Airbyte
 75    instances, both OSS and Enterprise.
 76
 77    Two authentication methods are supported (mutually exclusive):
 78    1. OAuth2 client credentials (client_id + client_secret)
 79    2. Bearer token authentication
 80
 81    Example with client credentials:
 82        ```python
 83        workspace = CloudWorkspace(
 84            workspace_id="...",
 85            client_id="...",
 86            client_secret="...",
 87        )
 88        ```
 89
 90    Example with bearer token:
 91        ```python
 92        workspace = CloudWorkspace(
 93            workspace_id="...",
 94            bearer_token="...",
 95        )
 96        ```
 97    """
 98
 99    workspace_id: str
100    client_id: SecretString | None
101    client_secret: SecretString | None
102    api_root: str
103    config_api_root: str | None
104    """The Config API root URL."""
105    bearer_token: SecretString | None
106
107    # Internal credentials objects (set in __init__, excluded from repr)
108    _credentials: _AirbyteCredentials = field(init=False, repr=False)
109    _client_config: CloudClientConfig = field(init=False, repr=False)
110
111    def __init__(
112        self,
113        *,
114        workspace_id: str | None = None,
115        client_id: str | SecretString | None = None,
116        client_secret: str | SecretString | None = None,
117        api_root: str | None = None,
118        config_api_root: str | None = None,
119        bearer_token: str | SecretString | None = None,
120    ) -> None:
121        """Validate and initialize credentials."""
122        env_vars = not (client_id or client_secret or bearer_token)
123        credentials = _AirbyteCredentials.from_auth(
124            workspace_id=workspace_id,
125            client_id=client_id,
126            client_secret=client_secret,
127            bearer_token=bearer_token,
128            public_api_root=api_root,
129            config_api_root=config_api_root,
130            env_vars=env_vars,
131        )
132        if not credentials.workspace_id:
133            raise exc.PyAirbyteInputError(
134                message="Workspace ID is required.",
135                guidance="Provide a workspace ID.",
136            )
137
138        self._credentials = credentials
139        self.workspace_id = credentials.workspace_id or ""
140        self.client_id = credentials.client_id
141        self.client_secret = credentials.client_secret
142        self.bearer_token = credentials.bearer_token
143        self.api_root = credentials.public_api_root
144        self.config_api_root = credentials.config_api_root
145
146        # Create internal CloudClientConfig object (validates mutual exclusivity)
147        self._client_config = CloudClientConfig(
148            client_id=self.client_id,
149            client_secret=self.client_secret,
150            bearer_token=self.bearer_token,
151            api_root=self.api_root,
152            config_api_root=self.config_api_root,
153        )
154
155    @classmethod
156    def from_env(
157        cls,
158        workspace_id: str | None = None,
159        *,
160        api_root: str | None = None,
161        config_api_root: str | None = None,
162    ) -> CloudWorkspace:
163        """Create a CloudWorkspace using credentials from environment variables.
164
165        This factory method resolves credentials from environment variables,
166        providing a convenient way to create a workspace without explicitly
167        passing credentials.
168
169        Two authentication methods are supported (mutually exclusive):
170        1. Bearer token (checked first)
171        2. OAuth2 client credentials (fallback)
172
173        Environment variables used:
174            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
175            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
176            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
177            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
178            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
179            - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL.
180
181        Args:
182            workspace_id: The workspace ID. If not provided, will be resolved from
183                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
184            api_root: The API root URL. If not provided, will be resolved from
185                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
186                the Airbyte Cloud API.
187            config_api_root: The Config API root URL. If not provided, will be resolved
188                from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable.
189
190        Returns:
191            A CloudWorkspace instance configured with credentials from the environment.
192
193        Raises:
194            PyAirbyteInputError: If required credentials are not found in
195                the environment or are incomplete.
196
197        Example:
198            ```python
199            # With workspace_id from environment
200            workspace = CloudWorkspace.from_env()
201
202            # With explicit workspace_id
203            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
204            ```
205        """
206        return cls(
207            workspace_id=workspace_id,
208            api_root=api_root,
209            config_api_root=config_api_root,
210        )
211
212    @property
213    def workspace_url(self) -> str | None:
214        """The web URL of the workspace."""
215        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"
216
217    @cached_property
218    def _organization_info(self) -> dict[str, Any]:
219        """Fetch and cache organization info for this workspace.
220
221        Uses the Config API endpoint for an efficient O(1) lookup.
222        This is an internal method; use get_organization() for public access.
223        """
224        return api_util.get_workspace_organization_info(
225            workspace_id=self.workspace_id,
226            api_root=self.api_root,
227            config_api_root=self.config_api_root,
228            client_id=self.client_id,
229            client_secret=self.client_secret,
230            bearer_token=self.bearer_token,
231        )
232
233    @overload
234    def get_organization(self) -> CloudOrganization: ...
235
236    @overload
237    def get_organization(
238        self,
239        *,
240        raise_on_error: Literal[True],
241    ) -> CloudOrganization: ...
242
243    @overload
244    def get_organization(
245        self,
246        *,
247        raise_on_error: Literal[False],
248    ) -> CloudOrganization | None: ...
249
250    def get_organization(
251        self,
252        *,
253        raise_on_error: bool = True,
254    ) -> CloudOrganization | None:
255        """Get the organization this workspace belongs to.
256
257        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
258        which may not be available with workspace-scoped credentials.
259
260        Args:
261            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
262                If False, returns None instead of raising.
263
264        Returns:
265            CloudOrganization object with organization_id and organization_name,
266            or None if raise_on_error=False and an error occurred.
267
268        Raises:
269            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
270                (e.g., due to insufficient permissions or missing data).
271        """
272        try:
273            info = self._organization_info
274        except (AirbyteError, NotImplementedError):
275            if raise_on_error:
276                raise
277            return None
278
279        organization_id = info.get("organizationId")
280        organization_name = info.get("organizationName")
281
282        # Validate that both organization_id and organization_name are non-null and non-empty
283        if not organization_id or not organization_name:
284            if raise_on_error:
285                raise AirbyteError(
286                    message="Organization info is incomplete.",
287                    context={
288                        "organization_id": organization_id,
289                        "organization_name": organization_name,
290                    },
291                )
292            return None
293
294        organization_credentials = self._credentials.with_organization_id(organization_id)
295        return CloudOrganization(
296            organization_id=organization_id,
297            organization_name=organization_name,
298            client_id=organization_credentials.client_id,
299            client_secret=organization_credentials.client_secret,
300            bearer_token=organization_credentials.bearer_token,
301            public_api_root=organization_credentials.public_api_root,
302            config_api_root=organization_credentials.config_api_root,
303        )
304
305    # Test connection and creds
306
307    def connect(self) -> None:
308        """Check that the workspace is reachable and raise an exception otherwise.
309
310        Note: It is not necessary to call this method before calling other operations. It
311              serves primarily as a simple check to ensure that the workspace is reachable
312              and credentials are correct.
313        """
314        _ = api_util.get_workspace(
315            api_root=self.api_root,
316            workspace_id=self.workspace_id,
317            client_id=self.client_id,
318            client_secret=self.client_secret,
319            bearer_token=self.bearer_token,
320        )
321        print(f"Successfully connected to workspace: {self.workspace_url}")
322
323    # Get sources, destinations, and connections
324
325    def get_connection(
326        self,
327        connection_id: str,
328    ) -> CloudConnection:
329        """Get a connection by ID.
330
331        This method does not fetch data from the API. It returns a `CloudConnection` object,
332        which will be loaded lazily as needed.
333        """
334        return CloudConnection(
335            workspace=self,
336            connection_id=connection_id,
337        )
338
339    def get_source(
340        self,
341        source_id: str,
342    ) -> CloudSource:
343        """Get a source by ID.
344
345        This method does not fetch data from the API. It returns a `CloudSource` object,
346        which will be loaded lazily as needed.
347        """
348        return CloudSource(
349            workspace=self,
350            connector_id=source_id,
351        )
352
353    def get_destination(
354        self,
355        destination_id: str,
356    ) -> CloudDestination:
357        """Get a destination by ID.
358
359        This method does not fetch data from the API. It returns a `CloudDestination` object,
360        which will be loaded lazily as needed.
361        """
362        return CloudDestination(
363            workspace=self,
364            connector_id=destination_id,
365        )
366
367    # Deploy sources and destinations
368
369    def deploy_source(
370        self,
371        name: str,
372        source: Source,
373        *,
374        unique: bool = True,
375        random_name_suffix: bool = False,
376    ) -> CloudSource:
377        """Deploy a source to the workspace.
378
379        Returns the newly deployed source.
380
381        Args:
382            name: The name to use when deploying.
383            source: The source object to deploy.
384            unique: Whether to require a unique name. If `True`, duplicate names
385                are not allowed. Defaults to `True`.
386            random_name_suffix: Whether to append a random suffix to the name.
387        """
388        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
389        source_config_dict["sourceType"] = source.name.replace("source-", "")
390
391        if random_name_suffix:
392            name += f" (ID: {text_util.generate_random_suffix()})"
393
394        if unique:
395            existing = self.list_sources(name=name)
396            if existing:
397                raise exc.AirbyteDuplicateResourcesError(
398                    resource_type="source",
399                    resource_name=name,
400                )
401
402        deployed_source = api_util.create_source(
403            name=name,
404            api_root=self.api_root,
405            workspace_id=self.workspace_id,
406            config=source_config_dict,
407            client_id=self.client_id,
408            client_secret=self.client_secret,
409            bearer_token=self.bearer_token,
410        )
411        return CloudSource(
412            workspace=self,
413            connector_id=deployed_source.source_id,
414        )
415
416    def deploy_destination(
417        self,
418        name: str,
419        destination: Destination | dict[str, Any],
420        *,
421        unique: bool = True,
422        random_name_suffix: bool = False,
423    ) -> CloudDestination:
424        """Deploy a destination to the workspace.
425
426        Returns the newly deployed destination ID.
427
428        Args:
429            name: The name to use when deploying.
430            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
431                dictionary of configuration values.
432            unique: Whether to require a unique name. If `True`, duplicate names
433                are not allowed. Defaults to `True`.
434            random_name_suffix: Whether to append a random suffix to the name.
435        """
436        if isinstance(destination, Destination):
437            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
438            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
439            # raise ValueError(destination_conf_dict)
440        else:
441            destination_conf_dict = destination.copy()
442            if "destinationType" not in destination_conf_dict:
443                raise exc.PyAirbyteInputError(
444                    message="Missing `destinationType` in configuration dictionary.",
445                )
446
447        if random_name_suffix:
448            name += f" (ID: {text_util.generate_random_suffix()})"
449
450        if unique:
451            existing = self.list_destinations(name=name)
452            if existing:
453                raise exc.AirbyteDuplicateResourcesError(
454                    resource_type="destination",
455                    resource_name=name,
456                )
457
458        deployed_destination = api_util.create_destination(
459            name=name,
460            api_root=self.api_root,
461            workspace_id=self.workspace_id,
462            config=destination_conf_dict,  # Wants a dataclass but accepts dict
463            client_id=self.client_id,
464            client_secret=self.client_secret,
465            bearer_token=self.bearer_token,
466        )
467        return CloudDestination(
468            workspace=self,
469            connector_id=deployed_destination.destination_id,
470        )
471
472    def permanently_delete_source(
473        self,
474        source: str | CloudSource,
475        *,
476        safe_mode: bool = True,
477    ) -> None:
478        """Delete a source from the workspace.
479
480        You can pass either the source ID `str` or a deployed `Source` object.
481
482        Args:
483            source: The source ID or CloudSource object to delete
484            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
485                (case insensitive) to prevent accidental deletion. Defaults to True.
486        """
487        if not isinstance(source, (str, CloudSource)):
488            raise exc.PyAirbyteInputError(
489                message="Invalid source type.",
490                input_value=type(source).__name__,
491            )
492
493        api_util.delete_source(
494            source_id=source.connector_id if isinstance(source, CloudSource) else source,
495            source_name=source.name if isinstance(source, CloudSource) else None,
496            api_root=self.api_root,
497            client_id=self.client_id,
498            client_secret=self.client_secret,
499            bearer_token=self.bearer_token,
500            safe_mode=safe_mode,
501        )
502
503    # Deploy and delete destinations
504
505    def permanently_delete_destination(
506        self,
507        destination: str | CloudDestination,
508        *,
509        safe_mode: bool = True,
510    ) -> None:
511        """Delete a deployed destination from the workspace.
512
513        You can pass either the `Cache` class or the deployed destination ID as a `str`.
514
515        Args:
516            destination: The destination ID or CloudDestination object to delete
517            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
518                (case insensitive) to prevent accidental deletion. Defaults to True.
519        """
520        if not isinstance(destination, (str, CloudDestination)):
521            raise exc.PyAirbyteInputError(
522                message="Invalid destination type.",
523                input_value=type(destination).__name__,
524            )
525
526        api_util.delete_destination(
527            destination_id=(
528                destination if isinstance(destination, str) else destination.destination_id
529            ),
530            destination_name=(
531                destination.name if isinstance(destination, CloudDestination) else None
532            ),
533            api_root=self.api_root,
534            client_id=self.client_id,
535            client_secret=self.client_secret,
536            bearer_token=self.bearer_token,
537            safe_mode=safe_mode,
538        )
539
540    # Deploy and delete connections
541
542    def deploy_connection(
543        self,
544        connection_name: str,
545        *,
546        source: CloudSource | str,
547        selected_streams: list[str],
548        destination: CloudDestination | str,
549        table_prefix: str | None = None,
550    ) -> CloudConnection:
551        """Create a new connection between an already deployed source and destination.
552
553        Returns the newly deployed connection object.
554
555        Args:
556            connection_name: The name of the connection.
557            source: The deployed source. You can pass a source ID or a CloudSource object.
558            destination: The deployed destination. You can pass a destination ID or a
559                CloudDestination object.
560            table_prefix: Optional. The table prefix to use when syncing to the destination.
561            selected_streams: The selected stream names to sync within the connection.
562        """
563        if not selected_streams:
564            raise exc.PyAirbyteInputError(
565                guidance="You must provide `selected_streams` when creating a connection."
566            )
567
568        source_id: str = source if isinstance(source, str) else source.connector_id
569        destination_id: str = (
570            destination if isinstance(destination, str) else destination.connector_id
571        )
572
573        deployed_connection = api_util.create_connection(
574            name=connection_name,
575            source_id=source_id,
576            destination_id=destination_id,
577            api_root=self.api_root,
578            workspace_id=self.workspace_id,
579            selected_stream_names=selected_streams,
580            prefix=table_prefix or "",
581            client_id=self.client_id,
582            client_secret=self.client_secret,
583            bearer_token=self.bearer_token,
584        )
585
586        return CloudConnection(
587            workspace=self,
588            connection_id=deployed_connection.connection_id,
589            source=deployed_connection.source_id,
590            destination=deployed_connection.destination_id,
591        )
592
593    def permanently_delete_connection(
594        self,
595        connection: str | CloudConnection,
596        *,
597        cascade_delete_source: bool = False,
598        cascade_delete_destination: bool = False,
599        safe_mode: bool = True,
600    ) -> None:
601        """Delete a deployed connection from the workspace.
602
603        Args:
604            connection: The connection ID or CloudConnection object to delete
605            cascade_delete_source: If True, also delete the source after deleting the connection
606            cascade_delete_destination: If True, also delete the destination after deleting
607                the connection
608            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
609                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
610                to cascade deletes.
611        """
612        if connection is None:
613            raise ValueError("No connection ID provided.")
614
615        if isinstance(connection, str):
616            connection = CloudConnection(
617                workspace=self,
618                connection_id=connection,
619            )
620
621        api_util.delete_connection(
622            connection_id=connection.connection_id,
623            connection_name=connection.name,
624            api_root=self.api_root,
625            workspace_id=self.workspace_id,
626            client_id=self.client_id,
627            client_secret=self.client_secret,
628            bearer_token=self.bearer_token,
629            safe_mode=safe_mode,
630        )
631
632        if cascade_delete_source:
633            self.permanently_delete_source(
634                source=connection.source_id,
635                safe_mode=safe_mode,
636            )
637        if cascade_delete_destination:
638            self.permanently_delete_destination(
639                destination=connection.destination_id,
640                safe_mode=safe_mode,
641            )
642
643    # List workspaces, sources, destinations, and connections
644
645    def list_workspaces(
646        self,
647        name: str | None = None,
648        *,
649        name_filter: Callable | None = None,
650        limit: int | None = None,
651    ) -> list[CloudWorkspaceInfo]:
652        """List workspaces available to the current credentials, with an optional limit."""
653        return [
654            CloudWorkspaceInfo.from_api_response(workspace)
655            for workspace in api_util.list_workspaces(
656                workspace_id="",
657                api_root=self.api_root,
658                name=name,
659                name_filter=name_filter,
660                client_id=self.client_id,
661                client_secret=self.client_secret,
662                bearer_token=self.bearer_token,
663                limit=limit,
664            )
665        ]
666
667    def rename(
668        self,
669        name: str,
670    ) -> CloudWorkspace:
671        """Rename this workspace."""
672        api_util.rename_workspace(
673            workspace_id=self.workspace_id,
674            name=name,
675            api_root=self.api_root,
676            client_id=self.client_id,
677            client_secret=self.client_secret,
678            bearer_token=self.bearer_token,
679        )
680        return self
681
682    def permanently_delete(
683        self,
684        *,
685        workspace_name: str | None = None,
686        safe_mode: bool = True,
687    ) -> None:
688        """Permanently delete this workspace if it has no connections.
689
690        When `safe_mode` is enabled, the workspace name must contain `delete-me`
691        or `deleteme`. This also checks for existing connections before deleting
692        and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty.
693        """
694        api_util.permanently_delete_workspace(
695            workspace_id=self.workspace_id,
696            workspace_name=workspace_name,
697            api_root=self.api_root,
698            client_id=self.client_id,
699            client_secret=self.client_secret,
700            bearer_token=self.bearer_token,
701            safe_mode=safe_mode,
702        )
703
704    def list_connections(
705        self,
706        name: str | None = None,
707        *,
708        name_filter: Callable | None = None,
709        limit: int | None = None,
710    ) -> list[CloudConnection]:
711        """List connections by name in the workspace, with an optional limit."""
712        connections = api_util.list_connections(
713            api_root=self.api_root,
714            workspace_id=self.workspace_id,
715            name=name,
716            name_filter=name_filter,
717            limit=limit,
718            client_id=self.client_id,
719            client_secret=self.client_secret,
720            bearer_token=self.bearer_token,
721        )
722        return [
723            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
724                workspace=self,
725                connection_response=connection,
726            )
727            for connection in connections
728        ]
729
730    def list_sources(
731        self,
732        name: str | None = None,
733        *,
734        name_filter: Callable | None = None,
735        limit: int | None = None,
736    ) -> list[CloudSource]:
737        """List all sources in the workspace, with an optional limit."""
738        sources = api_util.list_sources(
739            api_root=self.api_root,
740            workspace_id=self.workspace_id,
741            name=name,
742            name_filter=name_filter,
743            limit=limit,
744            client_id=self.client_id,
745            client_secret=self.client_secret,
746            bearer_token=self.bearer_token,
747        )
748        return [
749            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
750                workspace=self,
751                source_response=source,
752            )
753            for source in sources
754        ]
755
756    def list_destinations(
757        self,
758        name: str | None = None,
759        *,
760        name_filter: Callable | None = None,
761        limit: int | None = None,
762    ) -> list[CloudDestination]:
763        """List all destinations in the workspace, with an optional limit."""
764        destinations = api_util.list_destinations(
765            api_root=self.api_root,
766            workspace_id=self.workspace_id,
767            name=name,
768            name_filter=name_filter,
769            limit=limit,
770            client_id=self.client_id,
771            client_secret=self.client_secret,
772            bearer_token=self.bearer_token,
773        )
774        return [
775            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
776                workspace=self,
777                destination_response=destination,
778            )
779            for destination in destinations
780        ]
781
782    def publish_custom_source_definition(
783        self,
784        name: str,
785        *,
786        manifest_yaml: dict[str, Any] | Path | str | None = None,
787        docker_image: str | None = None,
788        docker_tag: str | None = None,
789        unique: bool = True,
790        pre_validate: bool = True,
791        testing_values: dict[str, Any] | None = None,
792    ) -> CustomCloudSourceDefinition:
793        """Publish a custom source connector definition.
794
795        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
796        and docker_tag (for Docker connectors), but not both.
797
798        Args:
799            name: Display name for the connector definition
800            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
801            docker_image: Docker repository (e.g., 'airbyte/source-custom')
802            docker_tag: Docker image tag (e.g., '1.0.0')
803            unique: Whether to enforce name uniqueness
804            pre_validate: Whether to validate manifest client-side (YAML only)
805            testing_values: Optional configuration values to use for testing in the
806                Connector Builder UI. If provided, these values are stored as the complete
807                testing values object for the connector builder project (replaces any existing
808                values), allowing immediate test read operations.
809
810        Returns:
811            CustomCloudSourceDefinition object representing the created definition
812
813        Raises:
814            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
815            AirbyteDuplicateResourcesError: If unique=True and name already exists
816        """
817        is_yaml = manifest_yaml is not None
818        is_docker = docker_image is not None
819
820        if is_yaml == is_docker:
821            raise exc.PyAirbyteInputError(
822                message=(
823                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
824                    "docker_image + docker_tag (for Docker connectors), but not both"
825                ),
826                context={
827                    "manifest_yaml_provided": is_yaml,
828                    "docker_image_provided": is_docker,
829                },
830            )
831
832        if is_docker and docker_tag is None:
833            raise exc.PyAirbyteInputError(
834                message="docker_tag is required when docker_image is specified",
835                context={"docker_image": docker_image},
836            )
837
838        if unique:
839            existing = self.list_custom_source_definitions(
840                definition_type="yaml" if is_yaml else "docker",
841            )
842            if any(d.name == name for d in existing):
843                raise exc.AirbyteDuplicateResourcesError(
844                    resource_type="custom_source_definition",
845                    resource_name=name,
846                )
847
848        if is_yaml:
849            manifest_dict: dict[str, Any]
850            if isinstance(manifest_yaml, Path):
851                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
852            elif isinstance(manifest_yaml, str):
853                manifest_dict = yaml.safe_load(manifest_yaml)
854            elif manifest_yaml is not None:
855                manifest_dict = manifest_yaml
856            else:
857                raise exc.PyAirbyteInputError(
858                    message="manifest_yaml is required for YAML connectors",
859                    context={"name": name},
860                )
861
862            if pre_validate:
863                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
864
865            result = api_util.create_custom_yaml_source_definition(
866                name=name,
867                workspace_id=self.workspace_id,
868                manifest=manifest_dict,
869                api_root=self.api_root,
870                client_id=self.client_id,
871                client_secret=self.client_secret,
872                bearer_token=self.bearer_token,
873            )
874            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
875                self, result
876            )
877
878            # Set testing values if provided
879            if testing_values is not None:
880                custom_definition.set_testing_values(testing_values)
881
882            return custom_definition
883
884        raise NotImplementedError(
885            "Docker custom source definitions are not yet supported. "
886            "Only YAML manifest-based custom sources are currently available."
887        )
888
889    def list_custom_source_definitions(
890        self,
891        *,
892        definition_type: Literal["yaml", "docker"],
893    ) -> list[CustomCloudSourceDefinition]:
894        """List custom source connector definitions.
895
896        Args:
897            definition_type: Connector type to list ("yaml" or "docker"). Required.
898
899        Returns:
900            List of CustomCloudSourceDefinition objects matching the specified type
901        """
902        if definition_type == "yaml":
903            yaml_definitions = api_util.list_custom_yaml_source_definitions(
904                workspace_id=self.workspace_id,
905                api_root=self.api_root,
906                client_id=self.client_id,
907                client_secret=self.client_secret,
908                bearer_token=self.bearer_token,
909            )
910            return [
911                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
912                for d in yaml_definitions
913            ]
914
915        raise NotImplementedError(
916            "Docker custom source definitions are not yet supported. "
917            "Only YAML manifest-based custom sources are currently available."
918        )
919
920    def get_custom_source_definition(
921        self,
922        definition_id: str,
923        *,
924        definition_type: Literal["yaml", "docker"],
925    ) -> CustomCloudSourceDefinition:
926        """Get a specific custom source definition by ID.
927
928        Args:
929            definition_id: The definition ID
930            definition_type: Connector type ("yaml" or "docker"). Required.
931
932        Returns:
933            CustomCloudSourceDefinition object
934        """
935        if definition_type == "yaml":
936            result = api_util.get_custom_yaml_source_definition(
937                workspace_id=self.workspace_id,
938                definition_id=definition_id,
939                api_root=self.api_root,
940                client_id=self.client_id,
941                client_secret=self.client_secret,
942                bearer_token=self.bearer_token,
943            )
944            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
945
946        raise NotImplementedError(
947            "Docker custom source definitions are not yet supported. "
948            "Only YAML manifest-based custom sources are currently available."
949        )

A remote workspace on the Airbyte Cloud.

By overriding api_root, you can use this class to interact with self-managed Airbyte instances, both OSS and Enterprise.

Two authentication methods are supported (mutually exclusive):

  1. OAuth2 client credentials (client_id + client_secret)
  2. Bearer token authentication
Example with client credentials:
workspace = CloudWorkspace(
    workspace_id="...",
    client_id="...",
    client_secret="...",
)
Example with bearer token:
workspace = CloudWorkspace(
    workspace_id="...",
    bearer_token="...",
)
CloudWorkspace( *, workspace_id: str | None = None, client_id: str | airbyte.secrets.SecretString | None = None, client_secret: str | airbyte.secrets.SecretString | None = None, api_root: str | None = None, config_api_root: str | None = None, bearer_token: str | airbyte.secrets.SecretString | None = None)
111    def __init__(
112        self,
113        *,
114        workspace_id: str | None = None,
115        client_id: str | SecretString | None = None,
116        client_secret: str | SecretString | None = None,
117        api_root: str | None = None,
118        config_api_root: str | None = None,
119        bearer_token: str | SecretString | None = None,
120    ) -> None:
121        """Validate and initialize credentials."""
122        env_vars = not (client_id or client_secret or bearer_token)
123        credentials = _AirbyteCredentials.from_auth(
124            workspace_id=workspace_id,
125            client_id=client_id,
126            client_secret=client_secret,
127            bearer_token=bearer_token,
128            public_api_root=api_root,
129            config_api_root=config_api_root,
130            env_vars=env_vars,
131        )
132        if not credentials.workspace_id:
133            raise exc.PyAirbyteInputError(
134                message="Workspace ID is required.",
135                guidance="Provide a workspace ID.",
136            )
137
138        self._credentials = credentials
139        self.workspace_id = credentials.workspace_id or ""
140        self.client_id = credentials.client_id
141        self.client_secret = credentials.client_secret
142        self.bearer_token = credentials.bearer_token
143        self.api_root = credentials.public_api_root
144        self.config_api_root = credentials.config_api_root
145
146        # Create internal CloudClientConfig object (validates mutual exclusivity)
147        self._client_config = CloudClientConfig(
148            client_id=self.client_id,
149            client_secret=self.client_secret,
150            bearer_token=self.bearer_token,
151            api_root=self.api_root,
152            config_api_root=self.config_api_root,
153        )

Validate and initialize credentials.

workspace_id: str
client_id: airbyte.secrets.SecretString | None
client_secret: airbyte.secrets.SecretString | None
api_root: str
config_api_root: str | None

The Config API root URL.

bearer_token: airbyte.secrets.SecretString | None
@classmethod
def from_env( cls, workspace_id: str | None = None, *, api_root: str | None = None, config_api_root: str | None = None) -> CloudWorkspace:
155    @classmethod
156    def from_env(
157        cls,
158        workspace_id: str | None = None,
159        *,
160        api_root: str | None = None,
161        config_api_root: str | None = None,
162    ) -> CloudWorkspace:
163        """Create a CloudWorkspace using credentials from environment variables.
164
165        This factory method resolves credentials from environment variables,
166        providing a convenient way to create a workspace without explicitly
167        passing credentials.
168
169        Two authentication methods are supported (mutually exclusive):
170        1. Bearer token (checked first)
171        2. OAuth2 client credentials (fallback)
172
173        Environment variables used:
174            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
175            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
176            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
177            - `AIRBYTE_CLOUD_WORKSPACE_ID`: The workspace ID (if not passed as argument).
178            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
179            - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL.
180
181        Args:
182            workspace_id: The workspace ID. If not provided, will be resolved from
183                the `AIRBYTE_CLOUD_WORKSPACE_ID` environment variable.
184            api_root: The API root URL. If not provided, will be resolved from
185                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
186                the Airbyte Cloud API.
187            config_api_root: The Config API root URL. If not provided, will be resolved
188                from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable.
189
190        Returns:
191            A CloudWorkspace instance configured with credentials from the environment.
192
193        Raises:
194            PyAirbyteInputError: If required credentials are not found in
195                the environment or are incomplete.
196
197        Example:
198            ```python
199            # With workspace_id from environment
200            workspace = CloudWorkspace.from_env()
201
202            # With explicit workspace_id
203            workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
204            ```
205        """
206        return cls(
207            workspace_id=workspace_id,
208            api_root=api_root,
209            config_api_root=config_api_root,
210        )

Create a CloudWorkspace using credentials from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create a workspace without explicitly passing credentials.

Two authentication methods are supported (mutually exclusive):

  1. Bearer token (checked first)
  2. OAuth2 client credentials (fallback)
Environment variables used:
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_WORKSPACE_ID: The workspace ID (if not passed as argument).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
  • AIRBYTE_CLOUD_CONFIG_API_URL: Optional. The Config API root URL.
Arguments:
  • workspace_id: The workspace ID. If not provided, will be resolved from the AIRBYTE_CLOUD_WORKSPACE_ID environment variable.
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
  • config_api_root: The Config API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_CONFIG_API_URL environment variable.
Returns:

A CloudWorkspace instance configured with credentials from the environment.

Raises:
  • PyAirbyteInputError: If required credentials are not found in the environment or are incomplete.
Example:
# With workspace_id from environment
workspace = CloudWorkspace.from_env()

# With explicit workspace_id
workspace = CloudWorkspace.from_env(workspace_id="your-workspace-id")
workspace_url: str | None
212    @property
213    def workspace_url(self) -> str | None:
214        """The web URL of the workspace."""
215        return f"{get_web_url_root(self.api_root)}/workspaces/{self.workspace_id}"

The web URL of the workspace.

def get_organization( self, *, raise_on_error: bool = True) -> CloudOrganization | None:
250    def get_organization(
251        self,
252        *,
253        raise_on_error: bool = True,
254    ) -> CloudOrganization | None:
255        """Get the organization this workspace belongs to.
256
257        Fetching organization info requires ORGANIZATION_READER permissions on the organization,
258        which may not be available with workspace-scoped credentials.
259
260        Args:
261            raise_on_error: If True (default), raises AirbyteError on permission or API errors.
262                If False, returns None instead of raising.
263
264        Returns:
265            CloudOrganization object with organization_id and organization_name,
266            or None if raise_on_error=False and an error occurred.
267
268        Raises:
269            AirbyteError: If raise_on_error=True and the organization info cannot be fetched
270                (e.g., due to insufficient permissions or missing data).
271        """
272        try:
273            info = self._organization_info
274        except (AirbyteError, NotImplementedError):
275            if raise_on_error:
276                raise
277            return None
278
279        organization_id = info.get("organizationId")
280        organization_name = info.get("organizationName")
281
282        # Validate that both organization_id and organization_name are non-null and non-empty
283        if not organization_id or not organization_name:
284            if raise_on_error:
285                raise AirbyteError(
286                    message="Organization info is incomplete.",
287                    context={
288                        "organization_id": organization_id,
289                        "organization_name": organization_name,
290                    },
291                )
292            return None
293
294        organization_credentials = self._credentials.with_organization_id(organization_id)
295        return CloudOrganization(
296            organization_id=organization_id,
297            organization_name=organization_name,
298            client_id=organization_credentials.client_id,
299            client_secret=organization_credentials.client_secret,
300            bearer_token=organization_credentials.bearer_token,
301            public_api_root=organization_credentials.public_api_root,
302            config_api_root=organization_credentials.config_api_root,
303        )

Get the organization this workspace belongs to.

Fetching organization info requires ORGANIZATION_READER permissions on the organization, which may not be available with workspace-scoped credentials.

Arguments:
  • raise_on_error: If True (default), raises AirbyteError on permission or API errors. If False, returns None instead of raising.
Returns:

CloudOrganization object with organization_id and organization_name, or None if raise_on_error=False and an error occurred.

Raises:
  • AirbyteError: If raise_on_error=True and the organization info cannot be fetched (e.g., due to insufficient permissions or missing data).
def connect(self) -> None:
307    def connect(self) -> None:
308        """Check that the workspace is reachable and raise an exception otherwise.
309
310        Note: It is not necessary to call this method before calling other operations. It
311              serves primarily as a simple check to ensure that the workspace is reachable
312              and credentials are correct.
313        """
314        _ = api_util.get_workspace(
315            api_root=self.api_root,
316            workspace_id=self.workspace_id,
317            client_id=self.client_id,
318            client_secret=self.client_secret,
319            bearer_token=self.bearer_token,
320        )
321        print(f"Successfully connected to workspace: {self.workspace_url}")

Check that the workspace is reachable and raise an exception otherwise.

Note: It is not necessary to call this method before calling other operations. It serves primarily as a simple check to ensure that the workspace is reachable and credentials are correct.

def get_connection(self, connection_id: str) -> CloudConnection:
325    def get_connection(
326        self,
327        connection_id: str,
328    ) -> CloudConnection:
329        """Get a connection by ID.
330
331        This method does not fetch data from the API. It returns a `CloudConnection` object,
332        which will be loaded lazily as needed.
333        """
334        return CloudConnection(
335            workspace=self,
336            connection_id=connection_id,
337        )

Get a connection by ID.

This method does not fetch data from the API. It returns a CloudConnection object, which will be loaded lazily as needed.

def get_source(self, source_id: str) -> airbyte.cloud.connectors.CloudSource:
339    def get_source(
340        self,
341        source_id: str,
342    ) -> CloudSource:
343        """Get a source by ID.
344
345        This method does not fetch data from the API. It returns a `CloudSource` object,
346        which will be loaded lazily as needed.
347        """
348        return CloudSource(
349            workspace=self,
350            connector_id=source_id,
351        )

Get a source by ID.

This method does not fetch data from the API. It returns a CloudSource object, which will be loaded lazily as needed.

def get_destination(self, destination_id: str) -> airbyte.cloud.connectors.CloudDestination:
353    def get_destination(
354        self,
355        destination_id: str,
356    ) -> CloudDestination:
357        """Get a destination by ID.
358
359        This method does not fetch data from the API. It returns a `CloudDestination` object,
360        which will be loaded lazily as needed.
361        """
362        return CloudDestination(
363            workspace=self,
364            connector_id=destination_id,
365        )

Get a destination by ID.

This method does not fetch data from the API. It returns a CloudDestination object, which will be loaded lazily as needed.

def deploy_source( self, name: str, source: airbyte.Source, *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudSource:
369    def deploy_source(
370        self,
371        name: str,
372        source: Source,
373        *,
374        unique: bool = True,
375        random_name_suffix: bool = False,
376    ) -> CloudSource:
377        """Deploy a source to the workspace.
378
379        Returns the newly deployed source.
380
381        Args:
382            name: The name to use when deploying.
383            source: The source object to deploy.
384            unique: Whether to require a unique name. If `True`, duplicate names
385                are not allowed. Defaults to `True`.
386            random_name_suffix: Whether to append a random suffix to the name.
387        """
388        source_config_dict = source._hydrated_config.copy()  # noqa: SLF001 (non-public API)
389        source_config_dict["sourceType"] = source.name.replace("source-", "")
390
391        if random_name_suffix:
392            name += f" (ID: {text_util.generate_random_suffix()})"
393
394        if unique:
395            existing = self.list_sources(name=name)
396            if existing:
397                raise exc.AirbyteDuplicateResourcesError(
398                    resource_type="source",
399                    resource_name=name,
400                )
401
402        deployed_source = api_util.create_source(
403            name=name,
404            api_root=self.api_root,
405            workspace_id=self.workspace_id,
406            config=source_config_dict,
407            client_id=self.client_id,
408            client_secret=self.client_secret,
409            bearer_token=self.bearer_token,
410        )
411        return CloudSource(
412            workspace=self,
413            connector_id=deployed_source.source_id,
414        )

Deploy a source to the workspace.

Returns the newly deployed source.

Arguments:
  • name: The name to use when deploying.
  • source: The source object to deploy.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def deploy_destination( self, name: str, destination: airbyte.Destination | dict[str, typing.Any], *, unique: bool = True, random_name_suffix: bool = False) -> airbyte.cloud.connectors.CloudDestination:
416    def deploy_destination(
417        self,
418        name: str,
419        destination: Destination | dict[str, Any],
420        *,
421        unique: bool = True,
422        random_name_suffix: bool = False,
423    ) -> CloudDestination:
424        """Deploy a destination to the workspace.
425
426        Returns the newly deployed destination ID.
427
428        Args:
429            name: The name to use when deploying.
430            destination: The destination to deploy. Can be a local Airbyte `Destination` object or a
431                dictionary of configuration values.
432            unique: Whether to require a unique name. If `True`, duplicate names
433                are not allowed. Defaults to `True`.
434            random_name_suffix: Whether to append a random suffix to the name.
435        """
436        if isinstance(destination, Destination):
437            destination_conf_dict = destination._hydrated_config.copy()  # noqa: SLF001 (non-public API)
438            destination_conf_dict["destinationType"] = destination.name.replace("destination-", "")
439            # raise ValueError(destination_conf_dict)
440        else:
441            destination_conf_dict = destination.copy()
442            if "destinationType" not in destination_conf_dict:
443                raise exc.PyAirbyteInputError(
444                    message="Missing `destinationType` in configuration dictionary.",
445                )
446
447        if random_name_suffix:
448            name += f" (ID: {text_util.generate_random_suffix()})"
449
450        if unique:
451            existing = self.list_destinations(name=name)
452            if existing:
453                raise exc.AirbyteDuplicateResourcesError(
454                    resource_type="destination",
455                    resource_name=name,
456                )
457
458        deployed_destination = api_util.create_destination(
459            name=name,
460            api_root=self.api_root,
461            workspace_id=self.workspace_id,
462            config=destination_conf_dict,  # Wants a dataclass but accepts dict
463            client_id=self.client_id,
464            client_secret=self.client_secret,
465            bearer_token=self.bearer_token,
466        )
467        return CloudDestination(
468            workspace=self,
469            connector_id=deployed_destination.destination_id,
470        )

Deploy a destination to the workspace.

Returns the newly deployed destination ID.

Arguments:
  • name: The name to use when deploying.
  • destination: The destination to deploy. Can be a local Airbyte Destination object or a dictionary of configuration values.
  • unique: Whether to require a unique name. If True, duplicate names are not allowed. Defaults to True.
  • random_name_suffix: Whether to append a random suffix to the name.
def permanently_delete_source( self, source: str | airbyte.cloud.connectors.CloudSource, *, safe_mode: bool = True) -> None:
472    def permanently_delete_source(
473        self,
474        source: str | CloudSource,
475        *,
476        safe_mode: bool = True,
477    ) -> None:
478        """Delete a source from the workspace.
479
480        You can pass either the source ID `str` or a deployed `Source` object.
481
482        Args:
483            source: The source ID or CloudSource object to delete
484            safe_mode: If True, requires the source name to contain "delete-me" or "deleteme"
485                (case insensitive) to prevent accidental deletion. Defaults to True.
486        """
487        if not isinstance(source, (str, CloudSource)):
488            raise exc.PyAirbyteInputError(
489                message="Invalid source type.",
490                input_value=type(source).__name__,
491            )
492
493        api_util.delete_source(
494            source_id=source.connector_id if isinstance(source, CloudSource) else source,
495            source_name=source.name if isinstance(source, CloudSource) else None,
496            api_root=self.api_root,
497            client_id=self.client_id,
498            client_secret=self.client_secret,
499            bearer_token=self.bearer_token,
500            safe_mode=safe_mode,
501        )

Delete a source from the workspace.

You can pass either the source ID str or a deployed Source object.

Arguments:
  • source: The source ID or CloudSource object to delete
  • safe_mode: If True, requires the source name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def permanently_delete_destination( self, destination: str | airbyte.cloud.connectors.CloudDestination, *, safe_mode: bool = True) -> None:
505    def permanently_delete_destination(
506        self,
507        destination: str | CloudDestination,
508        *,
509        safe_mode: bool = True,
510    ) -> None:
511        """Delete a deployed destination from the workspace.
512
513        You can pass either the `Cache` class or the deployed destination ID as a `str`.
514
515        Args:
516            destination: The destination ID or CloudDestination object to delete
517            safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme"
518                (case insensitive) to prevent accidental deletion. Defaults to True.
519        """
520        if not isinstance(destination, (str, CloudDestination)):
521            raise exc.PyAirbyteInputError(
522                message="Invalid destination type.",
523                input_value=type(destination).__name__,
524            )
525
526        api_util.delete_destination(
527            destination_id=(
528                destination if isinstance(destination, str) else destination.destination_id
529            ),
530            destination_name=(
531                destination.name if isinstance(destination, CloudDestination) else None
532            ),
533            api_root=self.api_root,
534            client_id=self.client_id,
535            client_secret=self.client_secret,
536            bearer_token=self.bearer_token,
537            safe_mode=safe_mode,
538        )

Delete a deployed destination from the workspace.

You can pass either the Cache class or the deployed destination ID as a str.

Arguments:
  • destination: The destination ID or CloudDestination object to delete
  • safe_mode: If True, requires the destination name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True.
def deploy_connection( self, connection_name: str, *, source: airbyte.cloud.connectors.CloudSource | str, selected_streams: list[str], destination: airbyte.cloud.connectors.CloudDestination | str, table_prefix: str | None = None) -> CloudConnection:
542    def deploy_connection(
543        self,
544        connection_name: str,
545        *,
546        source: CloudSource | str,
547        selected_streams: list[str],
548        destination: CloudDestination | str,
549        table_prefix: str | None = None,
550    ) -> CloudConnection:
551        """Create a new connection between an already deployed source and destination.
552
553        Returns the newly deployed connection object.
554
555        Args:
556            connection_name: The name of the connection.
557            source: The deployed source. You can pass a source ID or a CloudSource object.
558            destination: The deployed destination. You can pass a destination ID or a
559                CloudDestination object.
560            table_prefix: Optional. The table prefix to use when syncing to the destination.
561            selected_streams: The selected stream names to sync within the connection.
562        """
563        if not selected_streams:
564            raise exc.PyAirbyteInputError(
565                guidance="You must provide `selected_streams` when creating a connection."
566            )
567
568        source_id: str = source if isinstance(source, str) else source.connector_id
569        destination_id: str = (
570            destination if isinstance(destination, str) else destination.connector_id
571        )
572
573        deployed_connection = api_util.create_connection(
574            name=connection_name,
575            source_id=source_id,
576            destination_id=destination_id,
577            api_root=self.api_root,
578            workspace_id=self.workspace_id,
579            selected_stream_names=selected_streams,
580            prefix=table_prefix or "",
581            client_id=self.client_id,
582            client_secret=self.client_secret,
583            bearer_token=self.bearer_token,
584        )
585
586        return CloudConnection(
587            workspace=self,
588            connection_id=deployed_connection.connection_id,
589            source=deployed_connection.source_id,
590            destination=deployed_connection.destination_id,
591        )

Create a new connection between an already deployed source and destination.

Returns the newly deployed connection object.

Arguments:
  • connection_name: The name of the connection.
  • source: The deployed source. You can pass a source ID or a CloudSource object.
  • destination: The deployed destination. You can pass a destination ID or a CloudDestination object.
  • table_prefix: Optional. The table prefix to use when syncing to the destination.
  • selected_streams: The selected stream names to sync within the connection.
def permanently_delete_connection( self, connection: str | CloudConnection, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False, safe_mode: bool = True) -> None:
593    def permanently_delete_connection(
594        self,
595        connection: str | CloudConnection,
596        *,
597        cascade_delete_source: bool = False,
598        cascade_delete_destination: bool = False,
599        safe_mode: bool = True,
600    ) -> None:
601        """Delete a deployed connection from the workspace.
602
603        Args:
604            connection: The connection ID or CloudConnection object to delete
605            cascade_delete_source: If True, also delete the source after deleting the connection
606            cascade_delete_destination: If True, also delete the destination after deleting
607                the connection
608            safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme"
609                (case insensitive) to prevent accidental deletion. Defaults to True. Also applies
610                to cascade deletes.
611        """
612        if connection is None:
613            raise ValueError("No connection ID provided.")
614
615        if isinstance(connection, str):
616            connection = CloudConnection(
617                workspace=self,
618                connection_id=connection,
619            )
620
621        api_util.delete_connection(
622            connection_id=connection.connection_id,
623            connection_name=connection.name,
624            api_root=self.api_root,
625            workspace_id=self.workspace_id,
626            client_id=self.client_id,
627            client_secret=self.client_secret,
628            bearer_token=self.bearer_token,
629            safe_mode=safe_mode,
630        )
631
632        if cascade_delete_source:
633            self.permanently_delete_source(
634                source=connection.source_id,
635                safe_mode=safe_mode,
636            )
637        if cascade_delete_destination:
638            self.permanently_delete_destination(
639                destination=connection.destination_id,
640                safe_mode=safe_mode,
641            )

Delete a deployed connection from the workspace.

Arguments:
  • connection: The connection ID or CloudConnection object to delete
  • cascade_delete_source: If True, also delete the source after deleting the connection
  • cascade_delete_destination: If True, also delete the destination after deleting the connection
  • safe_mode: If True, requires the connection name to contain "delete-me" or "deleteme" (case insensitive) to prevent accidental deletion. Defaults to True. Also applies to cascade deletes.
def list_workspaces( self, name: str | None = None, *, name_filter: Callable | None = None, limit: int | None = None) -> list[CloudWorkspaceInfo]:
645    def list_workspaces(
646        self,
647        name: str | None = None,
648        *,
649        name_filter: Callable | None = None,
650        limit: int | None = None,
651    ) -> list[CloudWorkspaceInfo]:
652        """List workspaces available to the current credentials, with an optional limit."""
653        return [
654            CloudWorkspaceInfo.from_api_response(workspace)
655            for workspace in api_util.list_workspaces(
656                workspace_id="",
657                api_root=self.api_root,
658                name=name,
659                name_filter=name_filter,
660                client_id=self.client_id,
661                client_secret=self.client_secret,
662                bearer_token=self.bearer_token,
663                limit=limit,
664            )
665        ]

List workspaces available to the current credentials, with an optional limit.

def rename(self, name: str) -> CloudWorkspace:
667    def rename(
668        self,
669        name: str,
670    ) -> CloudWorkspace:
671        """Rename this workspace."""
672        api_util.rename_workspace(
673            workspace_id=self.workspace_id,
674            name=name,
675            api_root=self.api_root,
676            client_id=self.client_id,
677            client_secret=self.client_secret,
678            bearer_token=self.bearer_token,
679        )
680        return self

Rename this workspace.

def permanently_delete( self, *, workspace_name: str | None = None, safe_mode: bool = True) -> None:
682    def permanently_delete(
683        self,
684        *,
685        workspace_name: str | None = None,
686        safe_mode: bool = True,
687    ) -> None:
688        """Permanently delete this workspace if it has no connections.
689
690        When `safe_mode` is enabled, the workspace name must contain `delete-me`
691        or `deleteme`. This also checks for existing connections before deleting
692        and raises `AirbyteWorkspaceNotEmptyError` if the workspace is not empty.
693        """
694        api_util.permanently_delete_workspace(
695            workspace_id=self.workspace_id,
696            workspace_name=workspace_name,
697            api_root=self.api_root,
698            client_id=self.client_id,
699            client_secret=self.client_secret,
700            bearer_token=self.bearer_token,
701            safe_mode=safe_mode,
702        )

Permanently delete this workspace if it has no connections.

When safe_mode is enabled, the workspace name must contain delete-me or deleteme. This also checks for existing connections before deleting and raises AirbyteWorkspaceNotEmptyError if the workspace is not empty.

def list_connections( self, name: str | None = None, *, name_filter: Callable | None = None, limit: int | None = None) -> list[CloudConnection]:
704    def list_connections(
705        self,
706        name: str | None = None,
707        *,
708        name_filter: Callable | None = None,
709        limit: int | None = None,
710    ) -> list[CloudConnection]:
711        """List connections by name in the workspace, with an optional limit."""
712        connections = api_util.list_connections(
713            api_root=self.api_root,
714            workspace_id=self.workspace_id,
715            name=name,
716            name_filter=name_filter,
717            limit=limit,
718            client_id=self.client_id,
719            client_secret=self.client_secret,
720            bearer_token=self.bearer_token,
721        )
722        return [
723            CloudConnection._from_connection_response(  # noqa: SLF001 (non-public API)
724                workspace=self,
725                connection_response=connection,
726            )
727            for connection in connections
728        ]

List connections by name in the workspace, with an optional limit.

def list_sources( self, name: str | None = None, *, name_filter: Callable | None = None, limit: int | None = None) -> list[airbyte.cloud.connectors.CloudSource]:
730    def list_sources(
731        self,
732        name: str | None = None,
733        *,
734        name_filter: Callable | None = None,
735        limit: int | None = None,
736    ) -> list[CloudSource]:
737        """List all sources in the workspace, with an optional limit."""
738        sources = api_util.list_sources(
739            api_root=self.api_root,
740            workspace_id=self.workspace_id,
741            name=name,
742            name_filter=name_filter,
743            limit=limit,
744            client_id=self.client_id,
745            client_secret=self.client_secret,
746            bearer_token=self.bearer_token,
747        )
748        return [
749            CloudSource._from_source_response(  # noqa: SLF001 (non-public API)
750                workspace=self,
751                source_response=source,
752            )
753            for source in sources
754        ]

List all sources in the workspace, with an optional limit.

def list_destinations( self, name: str | None = None, *, name_filter: Callable | None = None, limit: int | None = None) -> list[airbyte.cloud.connectors.CloudDestination]:
756    def list_destinations(
757        self,
758        name: str | None = None,
759        *,
760        name_filter: Callable | None = None,
761        limit: int | None = None,
762    ) -> list[CloudDestination]:
763        """List all destinations in the workspace, with an optional limit."""
764        destinations = api_util.list_destinations(
765            api_root=self.api_root,
766            workspace_id=self.workspace_id,
767            name=name,
768            name_filter=name_filter,
769            limit=limit,
770            client_id=self.client_id,
771            client_secret=self.client_secret,
772            bearer_token=self.bearer_token,
773        )
774        return [
775            CloudDestination._from_destination_response(  # noqa: SLF001 (non-public API)
776                workspace=self,
777                destination_response=destination,
778            )
779            for destination in destinations
780        ]

List all destinations in the workspace, with an optional limit.

def publish_custom_source_definition( self, name: str, *, manifest_yaml: dict[str, typing.Any] | pathlib.Path | str | None = None, docker_image: str | None = None, docker_tag: str | None = None, unique: bool = True, pre_validate: bool = True, testing_values: dict[str, typing.Any] | None = None) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
782    def publish_custom_source_definition(
783        self,
784        name: str,
785        *,
786        manifest_yaml: dict[str, Any] | Path | str | None = None,
787        docker_image: str | None = None,
788        docker_tag: str | None = None,
789        unique: bool = True,
790        pre_validate: bool = True,
791        testing_values: dict[str, Any] | None = None,
792    ) -> CustomCloudSourceDefinition:
793        """Publish a custom source connector definition.
794
795        You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image
796        and docker_tag (for Docker connectors), but not both.
797
798        Args:
799            name: Display name for the connector definition
800            manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
801            docker_image: Docker repository (e.g., 'airbyte/source-custom')
802            docker_tag: Docker image tag (e.g., '1.0.0')
803            unique: Whether to enforce name uniqueness
804            pre_validate: Whether to validate manifest client-side (YAML only)
805            testing_values: Optional configuration values to use for testing in the
806                Connector Builder UI. If provided, these values are stored as the complete
807                testing values object for the connector builder project (replaces any existing
808                values), allowing immediate test read operations.
809
810        Returns:
811            CustomCloudSourceDefinition object representing the created definition
812
813        Raises:
814            PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
815            AirbyteDuplicateResourcesError: If unique=True and name already exists
816        """
817        is_yaml = manifest_yaml is not None
818        is_docker = docker_image is not None
819
820        if is_yaml == is_docker:
821            raise exc.PyAirbyteInputError(
822                message=(
823                    "Must specify EITHER manifest_yaml (for YAML connectors) OR "
824                    "docker_image + docker_tag (for Docker connectors), but not both"
825                ),
826                context={
827                    "manifest_yaml_provided": is_yaml,
828                    "docker_image_provided": is_docker,
829                },
830            )
831
832        if is_docker and docker_tag is None:
833            raise exc.PyAirbyteInputError(
834                message="docker_tag is required when docker_image is specified",
835                context={"docker_image": docker_image},
836            )
837
838        if unique:
839            existing = self.list_custom_source_definitions(
840                definition_type="yaml" if is_yaml else "docker",
841            )
842            if any(d.name == name for d in existing):
843                raise exc.AirbyteDuplicateResourcesError(
844                    resource_type="custom_source_definition",
845                    resource_name=name,
846                )
847
848        if is_yaml:
849            manifest_dict: dict[str, Any]
850            if isinstance(manifest_yaml, Path):
851                manifest_dict = yaml.safe_load(manifest_yaml.read_text())
852            elif isinstance(manifest_yaml, str):
853                manifest_dict = yaml.safe_load(manifest_yaml)
854            elif manifest_yaml is not None:
855                manifest_dict = manifest_yaml
856            else:
857                raise exc.PyAirbyteInputError(
858                    message="manifest_yaml is required for YAML connectors",
859                    context={"name": name},
860                )
861
862            if pre_validate:
863                api_util.validate_yaml_manifest(manifest_dict, raise_on_error=True)
864
865            result = api_util.create_custom_yaml_source_definition(
866                name=name,
867                workspace_id=self.workspace_id,
868                manifest=manifest_dict,
869                api_root=self.api_root,
870                client_id=self.client_id,
871                client_secret=self.client_secret,
872                bearer_token=self.bearer_token,
873            )
874            custom_definition = CustomCloudSourceDefinition._from_yaml_response(  # noqa: SLF001
875                self, result
876            )
877
878            # Set testing values if provided
879            if testing_values is not None:
880                custom_definition.set_testing_values(testing_values)
881
882            return custom_definition
883
884        raise NotImplementedError(
885            "Docker custom source definitions are not yet supported. "
886            "Only YAML manifest-based custom sources are currently available."
887        )

Publish a custom source connector definition.

You must specify EITHER manifest_yaml (for YAML connectors) OR both docker_image and docker_tag (for Docker connectors), but not both.

Arguments:
  • name: Display name for the connector definition
  • manifest_yaml: Low-code CDK manifest (dict, Path to YAML file, or YAML string)
  • docker_image: Docker repository (e.g., 'airbyte/source-custom')
  • docker_tag: Docker image tag (e.g., '1.0.0')
  • unique: Whether to enforce name uniqueness
  • pre_validate: Whether to validate manifest client-side (YAML only)
  • testing_values: Optional configuration values to use for testing in the Connector Builder UI. If provided, these values are stored as the complete testing values object for the connector builder project (replaces any existing values), allowing immediate test read operations.
Returns:

CustomCloudSourceDefinition object representing the created definition

Raises:
  • PyAirbyteInputError: If both or neither of manifest_yaml and docker_image provided
  • AirbyteDuplicateResourcesError: If unique=True and name already exists
def list_custom_source_definitions( self, *, definition_type: Literal['yaml', 'docker']) -> list[airbyte.cloud.connectors.CustomCloudSourceDefinition]:
889    def list_custom_source_definitions(
890        self,
891        *,
892        definition_type: Literal["yaml", "docker"],
893    ) -> list[CustomCloudSourceDefinition]:
894        """List custom source connector definitions.
895
896        Args:
897            definition_type: Connector type to list ("yaml" or "docker"). Required.
898
899        Returns:
900            List of CustomCloudSourceDefinition objects matching the specified type
901        """
902        if definition_type == "yaml":
903            yaml_definitions = api_util.list_custom_yaml_source_definitions(
904                workspace_id=self.workspace_id,
905                api_root=self.api_root,
906                client_id=self.client_id,
907                client_secret=self.client_secret,
908                bearer_token=self.bearer_token,
909            )
910            return [
911                CustomCloudSourceDefinition._from_yaml_response(self, d)  # noqa: SLF001
912                for d in yaml_definitions
913            ]
914
915        raise NotImplementedError(
916            "Docker custom source definitions are not yet supported. "
917            "Only YAML manifest-based custom sources are currently available."
918        )

List custom source connector definitions.

Arguments:
  • definition_type: Connector type to list ("yaml" or "docker"). Required.
Returns:

List of CustomCloudSourceDefinition objects matching the specified type

def get_custom_source_definition( self, definition_id: str, *, definition_type: Literal['yaml', 'docker']) -> airbyte.cloud.connectors.CustomCloudSourceDefinition:
920    def get_custom_source_definition(
921        self,
922        definition_id: str,
923        *,
924        definition_type: Literal["yaml", "docker"],
925    ) -> CustomCloudSourceDefinition:
926        """Get a specific custom source definition by ID.
927
928        Args:
929            definition_id: The definition ID
930            definition_type: Connector type ("yaml" or "docker"). Required.
931
932        Returns:
933            CustomCloudSourceDefinition object
934        """
935        if definition_type == "yaml":
936            result = api_util.get_custom_yaml_source_definition(
937                workspace_id=self.workspace_id,
938                definition_id=definition_id,
939                api_root=self.api_root,
940                client_id=self.client_id,
941                client_secret=self.client_secret,
942                bearer_token=self.bearer_token,
943            )
944            return CustomCloudSourceDefinition._from_yaml_response(self, result)  # noqa: SLF001
945
946        raise NotImplementedError(
947            "Docker custom source definitions are not yet supported. "
948            "Only YAML manifest-based custom sources are currently available."
949        )

Get a specific custom source definition by ID.

Arguments:
  • definition_id: The definition ID
  • definition_type: Connector type ("yaml" or "docker"). Required.
Returns:

CustomCloudSourceDefinition object

class CloudConnection:
 44class CloudConnection:  # noqa: PLR0904  # Too many public methods
 45    """A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.
 46
 47    You can use a connection object to run sync jobs, retrieve logs, and manage the connection.
 48    """
 49
 50    def __init__(
 51        self,
 52        workspace: CloudWorkspace,
 53        connection_id: str,
 54        source: str | None = None,
 55        destination: str | None = None,
 56    ) -> None:
 57        """It is not recommended to create a `CloudConnection` object directly.
 58
 59        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
 60        """
 61        self.connection_id = connection_id
 62        """The ID of the connection."""
 63
 64        self.workspace = workspace
 65        """The workspace that the connection belongs to."""
 66
 67        self._source_id = source
 68        """The ID of the source."""
 69
 70        self._destination_id = destination
 71        """The ID of the destination."""
 72
 73        self._connection_info: CloudConnectionInfo | None = None
 74        """The connection info object. (Cached.)"""
 75
 76        self._cloud_source_object: CloudSource | None = None
 77        """The source object. (Cached.)"""
 78
 79        self._cloud_destination_object: CloudDestination | None = None
 80        """The destination object. (Cached.)"""
 81
 82    def _fetch_connection_info(
 83        self,
 84        *,
 85        force_refresh: bool = False,
 86        verify: bool = True,
 87    ) -> CloudConnectionInfo:
 88        """Fetch and cache connection info from the API.
 89
 90        By default, this method will only fetch from the API if connection info is not
 91        already cached. It also verifies that the connection belongs to the expected
 92        workspace unless verification is explicitly disabled.
 93
 94        Args:
 95            force_refresh: If True, always fetch from the API even if cached.
 96                If False (default), only fetch if not already cached.
 97            verify: If True (default), verify that the connection is valid (e.g., that
 98                the workspace_id matches this object's workspace). Raises an error if
 99                validation fails.
100
101        Returns:
102            Information about the connection from the API.
103
104        Raises:
105            AirbyteWorkspaceMismatchError: If verify is True and the connection's
106                workspace_id doesn't match the expected workspace.
107            AirbyteMissingResourceError: If the connection doesn't exist.
108        """
109        if not force_refresh and self._connection_info is not None:
110            # Use cached info, but still verify if requested
111            if verify:
112                self._verify_workspace_match(self._connection_info)
113            return self._connection_info
114
115        # Fetch from API
116        connection_info = api_util.get_connection(
117            workspace_id=self.workspace.workspace_id,
118            connection_id=self.connection_id,
119            api_root=self.workspace.api_root,
120            client_id=self.workspace.client_id,
121            client_secret=self.workspace.client_secret,
122            bearer_token=self.workspace.bearer_token,
123        )
124        result = CloudConnectionInfo.from_api_response(connection_info)
125
126        self._connection_info = result
127
128        # Verify if requested
129        if verify:
130            self._verify_workspace_match(result)
131
132        return result
133
134    def _verify_workspace_match(self, connection_info: CloudConnectionInfo) -> None:
135        """Verify that the connection belongs to the expected workspace.
136
137        Raises:
138            AirbyteWorkspaceMismatchError: If the workspace IDs don't match.
139        """
140        if connection_info.workspace_id != self.workspace.workspace_id:
141            raise AirbyteWorkspaceMismatchError(
142                resource_type="connection",
143                resource_id=self.connection_id,
144                workspace=self.workspace,
145                expected_workspace_id=self.workspace.workspace_id,
146                actual_workspace_id=connection_info.workspace_id,
147                message=(
148                    f"Connection '{self.connection_id}' belongs to workspace "
149                    f"'{connection_info.workspace_id}', not '{self.workspace.workspace_id}'."
150                ),
151            )
152
153    def check_is_valid(self) -> bool:
154        """Check if this connection exists and belongs to the expected workspace.
155
156        This method fetches connection info from the API (if not already cached) and
157        verifies that the connection's workspace_id matches the workspace associated
158        with this CloudConnection object.
159
160        Returns:
161            True if the connection exists and belongs to the expected workspace.
162
163        Raises:
164            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
165            AirbyteMissingResourceError: If the connection doesn't exist.
166        """
167        self._fetch_connection_info(force_refresh=False, verify=True)
168        return True
169
170    @classmethod
171    def _from_connection_response(
172        cls,
173        workspace: CloudWorkspace,
174        connection_response: _ConnectionResponseLike,
175    ) -> CloudConnection:
176        """Create a CloudConnection from an API connection response."""
177        connection_info = CloudConnectionInfo.from_api_response(connection_response)
178        result = cls(
179            workspace=workspace,
180            connection_id=connection_info.connection_id,
181            source=connection_info.source_id,
182            destination=connection_info.destination_id,
183        )
184        result._connection_info = connection_info  # noqa: SLF001 # Accessing Non-Public API
185        return result
186
187    # Properties
188
189    @property
190    def name(self) -> str | None:
191        """Get the display name of the connection, if available.
192
193        E.g. "My Postgres to Snowflake", not the connection ID.
194        """
195        if not self._connection_info:
196            self._connection_info = self._fetch_connection_info()
197
198        return self._connection_info.name
199
200    @property
201    def source_id(self) -> str:
202        """The ID of the source."""
203        if not self._source_id:
204            if not self._connection_info:
205                self._connection_info = self._fetch_connection_info()
206
207            self._source_id = self._connection_info.source_id
208
209        return self._source_id
210
211    @property
212    def source(self) -> CloudSource:
213        """Get the source object."""
214        if self._cloud_source_object:
215            return self._cloud_source_object
216
217        self._cloud_source_object = CloudSource(
218            workspace=self.workspace,
219            connector_id=self.source_id,
220        )
221        return self._cloud_source_object
222
223    @property
224    def destination_id(self) -> str:
225        """The ID of the destination."""
226        if not self._destination_id:
227            if not self._connection_info:
228                self._connection_info = self._fetch_connection_info()
229
230            self._destination_id = self._connection_info.destination_id
231
232        return self._destination_id
233
234    @property
235    def destination(self) -> CloudDestination:
236        """Get the destination object."""
237        if self._cloud_destination_object:
238            return self._cloud_destination_object
239
240        self._cloud_destination_object = CloudDestination(
241            workspace=self.workspace,
242            connector_id=self.destination_id,
243        )
244        return self._cloud_destination_object
245
246    @property
247    def stream_names(self) -> list[str]:
248        """The stream names."""
249        if not self._connection_info:
250            self._connection_info = self._fetch_connection_info()
251
252        return [stream.name for stream in self._connection_info.configurations.streams or []]
253
254    @property
255    def table_prefix(self) -> str:
256        """The table prefix."""
257        if not self._connection_info:
258            self._connection_info = self._fetch_connection_info()
259
260        return self._connection_info.prefix or ""
261
262    @property
263    def connection_url(self) -> str | None:
264        """The web URL to the connection."""
265        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"
266
267    @property
268    def job_history_url(self) -> str | None:
269        """The URL to the job history for the connection."""
270        return f"{self.connection_url}/timeline"
271
272    # Run Sync
273
274    def run_sync(
275        self,
276        *,
277        wait: bool = True,
278        wait_timeout: int = 300,
279    ) -> SyncResult:
280        """Run a sync."""
281        connection_response = api_util.run_connection(
282            connection_id=self.connection_id,
283            api_root=self.workspace.api_root,
284            workspace_id=self.workspace.workspace_id,
285            client_id=self.workspace.client_id,
286            client_secret=self.workspace.client_secret,
287            bearer_token=self.workspace.bearer_token,
288        )
289        sync_result = SyncResult(
290            workspace=self.workspace,
291            connection=self,
292            job_id=connection_response.job_id,
293        )
294
295        if wait:
296            sync_result.wait_for_completion(
297                wait_timeout=wait_timeout,
298                raise_failure=True,
299                raise_timeout=True,
300            )
301
302        return sync_result
303
304    def __repr__(self) -> str:
305        """String representation of the connection."""
306        return (
307            f"CloudConnection(connection_id={self.connection_id}, source_id={self.source_id}, "
308            f"destination_id={self.destination_id}, connection_url={self.connection_url})"
309        )
310
311    # Logs
312
313    def get_previous_sync_logs(
314        self,
315        *,
316        limit: int = 20,
317        offset: int | None = None,
318        from_tail: bool = True,
319        job_type: str | JobTypeEnum | None = None,
320    ) -> list[SyncResult]:
321        """Get previous sync jobs for a connection with pagination support.
322
323        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
324        rows_synced, start_time). Full log text can be fetched lazily via
325        `SyncResult.get_full_log_text()`.
326
327        Args:
328            limit: Maximum number of jobs to return. Defaults to 20.
329            offset: Number of jobs to skip from the beginning. Defaults to None (0).
330            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
331                If False, returns jobs ordered oldest-first (createdAt ASC).
332                Defaults to True.
333            job_type: Filter by job type (e.g., `sync`, `refresh`).
334                If not specified, defaults to sync and reset jobs only (API default behavior).
335
336        Returns:
337            A list of SyncResult objects representing the sync jobs.
338        """
339        order_by = (
340            api_util.JOB_ORDER_BY_CREATED_AT_DESC
341            if from_tail
342            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
343        )
344        sync_logs = api_util.get_job_logs(
345            connection_id=self.connection_id,
346            api_root=self.workspace.api_root,
347            workspace_id=self.workspace.workspace_id,
348            limit=limit,
349            offset=offset,
350            order_by=order_by,
351            job_type=job_type,
352            client_id=self.workspace.client_id,
353            client_secret=self.workspace.client_secret,
354            bearer_token=self.workspace.bearer_token,
355        )
356        return [
357            SyncResult(
358                workspace=self.workspace,
359                connection=self,
360                job_id=sync_log.job_id,
361                _latest_job_info=CloudJobInfo.from_api_response(sync_log),
362            )
363            for sync_log in sync_logs
364        ]
365
366    def get_sync_result(
367        self,
368        job_id: int | None = None,
369    ) -> SyncResult | None:
370        """Get the sync result for the connection.
371
372        If `job_id` is not provided, the most recent sync job will be used.
373
374        Returns `None` if job_id is omitted and no previous jobs are found.
375        """
376        if job_id is None:
377            # Get the most recent sync job
378            results = self.get_previous_sync_logs(
379                limit=1,
380            )
381            if results:
382                return results[0]
383
384            return None
385
386        # Get the sync job by ID (lazy loaded)
387        return SyncResult(
388            workspace=self.workspace,
389            connection=self,
390            job_id=job_id,
391        )
392
393    # Artifacts
394
395    @deprecated("Use 'dump_raw_state()' instead.")
396    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
397        """Deprecated. Use `dump_raw_state()` instead."""
398        state_response = api_util.get_connection_state(
399            connection_id=self.connection_id,
400            api_root=self.workspace.api_root,
401            client_id=self.workspace.client_id,
402            client_secret=self.workspace.client_secret,
403            bearer_token=self.workspace.bearer_token,
404            config_api_root=self.workspace.config_api_root,
405        )
406        if state_response.get("stateType") == "not_set":
407            return None
408        return state_response.get("streamState", [])
409
410    @overload
411    def dump_raw_state(self, *, normalize: Literal[True] = True) -> list[dict[str, Any]]: ...
412
413    @overload
414    def dump_raw_state(self, *, normalize: Literal[False]) -> dict[str, Any]: ...
415
416    def dump_raw_state(
417        self,
418        *,
419        normalize: bool = True,
420    ) -> dict[str, Any] | list[dict[str, Any]]:
421        """Dump the state for this connection.
422
423        By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts
424        with snake_case keys, suitable for passing to a connector's `--state` flag.
425
426        When `normalize` is `False`, returns the raw Config API dict (camelCase keys,
427        includes `stateType` and `connectionId`). This raw format can be passed
428        directly to `import_raw_state()` for backup/restore workflows.
429
430        Args:
431            normalize: If `True` (default), convert to Airbyte protocol format.
432                If `False`, return the raw Config API response.
433
434        Returns:
435            Normalized: list of protocol-format state message dicts (empty list if
436            no state). Raw: the full Config API state dict.
437        """
438        raw = api_util.get_connection_state(
439            connection_id=self.connection_id,
440            api_root=self.workspace.api_root,
441            client_id=self.workspace.client_id,
442            client_secret=self.workspace.client_secret,
443            bearer_token=self.workspace.bearer_token,
444            config_api_root=self.workspace.config_api_root,
445        )
446        if normalize:
447            return _normalize_state_to_protocol(raw)
448        return raw
449
450    def import_raw_state(
451        self,
452        connection_state: dict[str, Any] | list[dict[str, Any]],
453    ) -> dict[str, Any]:
454        """Import (restore) the full state for this connection.
455
456        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
457        > could result in broken connections, and/or incorrect sync behavior.
458
459        Replaces the entire connection state with the provided state blob.
460        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
461
462        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
463        The `connectionId` in the blob is always overridden with this connection's
464        ID, making state blobs portable across connections.
465
466        Accepts either format:
467
468        - **Config API format** (dict with `stateType`): passed through directly.
469        - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically
470          converted to Config API format before sending.
471
472        Args:
473            connection_state: Connection state in either Config API or Airbyte protocol format.
474
475        Returns:
476            The updated connection state as a dictionary.
477
478        Raises:
479            AirbyteConnectionSyncActiveError: If a sync is currently running on this
480                connection (HTTP 423). Wait for the sync to complete before retrying.
481        """
482        api_state: dict[str, Any]
483        if isinstance(connection_state, list):
484            if not _is_protocol_state_format(connection_state):
485                msg = (
486                    "Expected connection_state list to contain Airbyte protocol state "
487                    "message dicts (each with a top-level `type` of STREAM, GLOBAL, "
488                    "or LEGACY). Got a list that does not match protocol format."
489                )
490                raise ValueError(msg)
491            api_state = _denormalize_protocol_state_to_api(
492                protocol_messages=connection_state,
493                connection_id=self.connection_id,
494            )
495        elif isinstance(connection_state, dict):
496            if _is_protocol_state_format(connection_state):
497                api_state = _denormalize_protocol_state_to_api(
498                    protocol_messages=[connection_state],
499                    connection_id=self.connection_id,
500                )
501            else:
502                api_state = connection_state
503        else:
504            msg = f"Expected a dict or list, got {type(connection_state)}"
505            raise TypeError(msg)
506
507        return api_util.replace_connection_state(
508            connection_id=self.connection_id,
509            connection_state_dict=api_state,
510            api_root=self.workspace.api_root,
511            client_id=self.workspace.client_id,
512            client_secret=self.workspace.client_secret,
513            bearer_token=self.workspace.bearer_token,
514            config_api_root=self.workspace.config_api_root,
515        )
516
517    def get_stream_state(
518        self,
519        stream_name: str,
520        stream_namespace: str | None = None,
521    ) -> dict[str, Any] | None:
522        """Get the state blob for a single stream within this connection.
523
524        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
525        not the full connection state envelope.
526
527        This is compatible with `stream`-type state and stream-level entries
528        within a `global`-type state. It is not compatible with `legacy` state.
529        To get or set the entire connection-level state artifact, use
530        `dump_raw_state` and `import_raw_state` instead.
531
532        Args:
533            stream_name: The name of the stream to get state for.
534            stream_namespace: The source-side stream namespace. This refers to the
535                namespace from the source (e.g., database schema), not any destination
536                namespace override set in connection advanced settings.
537
538        Returns:
539            The stream's state blob as a dictionary, or None if the stream is not found.
540        """
541        state_data = self.dump_raw_state(normalize=False)
542        result = ConnectionStateResponse(**state_data)
543
544        streams = _get_stream_list(result)
545        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
546
547        if not matching:
548            available = [s.stream_descriptor.name for s in streams]
549            logger.warning(
550                "Stream '%s' not found in connection state for connection '%s'. "
551                "Available streams: %s",
552                stream_name,
553                self.connection_id,
554                available,
555            )
556            return None
557
558        return matching[0].stream_state
559
560    def set_stream_state(
561        self,
562        stream_name: str,
563        state_blob_dict: dict[str, Any],
564        stream_namespace: str | None = None,
565    ) -> None:
566        """Set the state for a single stream within this connection.
567
568        Fetches the current full state, replaces only the specified stream's state,
569        then sends the full updated state back to the API. If the stream does not
570        exist in the current state, it is appended.
571
572        This is compatible with `stream`-type state and stream-level entries
573        within a `global`-type state. It is not compatible with `legacy` state.
574        To get or set the entire connection-level state artifact, use
575        `dump_raw_state` and `import_raw_state` instead.
576
577        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
578
579        Args:
580            stream_name: The name of the stream to update state for.
581            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
582            stream_namespace: The source-side stream namespace. This refers to the
583                namespace from the source (e.g., database schema), not any destination
584                namespace override set in connection advanced settings.
585
586        Raises:
587            PyAirbyteInputError: If the connection state type is not supported for
588                stream-level operations (not_set, legacy).
589            AirbyteConnectionSyncActiveError: If a sync is currently running on this
590                connection (HTTP 423). Wait for the sync to complete before retrying.
591        """
592        state_data = self.dump_raw_state(normalize=False)
593        current = ConnectionStateResponse(**state_data)
594
595        if current.state_type == "not_set":
596            raise PyAirbyteInputError(
597                message="Cannot set stream state: connection has no existing state.",
598                context={"connection_id": self.connection_id},
599            )
600
601        if current.state_type == "legacy":
602            raise PyAirbyteInputError(
603                message="Cannot set stream state on a legacy-type connection state.",
604                context={"connection_id": self.connection_id},
605            )
606
607        new_stream_entry = {
608            "streamDescriptor": {
609                "name": stream_name,
610                **(
611                    {
612                        "namespace": stream_namespace,
613                    }
614                    if stream_namespace
615                    else {}
616                ),
617            },
618            "streamState": state_blob_dict,
619        }
620
621        raw_streams: list[dict[str, Any]]
622        if current.state_type == "stream":
623            raw_streams = state_data.get("streamState", [])
624        elif current.state_type == "global":
625            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
626        else:
627            raw_streams = []
628
629        streams = _get_stream_list(current)
630        found = False
631        updated_streams_raw: list[dict[str, Any]] = []
632        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
633            if _match_stream(parsed_s, stream_name, stream_namespace):
634                updated_streams_raw.append(new_stream_entry)
635                found = True
636            else:
637                updated_streams_raw.append(raw_s)
638
639        if not found:
640            updated_streams_raw.append(new_stream_entry)
641
642        full_state: dict[str, Any] = {
643            **state_data,
644        }
645
646        if current.state_type == "stream":
647            full_state["streamState"] = updated_streams_raw
648        elif current.state_type == "global":
649            original_global = state_data.get("globalState", {})
650            full_state["globalState"] = {
651                **original_global,
652                "streamStates": updated_streams_raw,
653            }
654
655        self.import_raw_state(full_state)
656
657    @deprecated("Use 'dump_raw_catalog()' instead.")
658    def get_catalog_artifact(self) -> dict[str, Any] | None:
659        """Get the configured catalog for this connection.
660
661        Returns the full configured catalog (syncCatalog) for this connection,
662        including stream schemas, sync modes, cursor fields, and primary keys.
663
664        Uses the Config API endpoint: POST /v1/web_backend/connections/get
665
666        Returns:
667            Dictionary containing the configured catalog, or `None` if not found.
668        """
669        return self.dump_raw_catalog()
670
671    def dump_raw_catalog(
672        self,
673        *,
674        normalize: bool = True,
675    ) -> dict[str, Any] | None:
676        """Dump the configured catalog for this connection.
677
678        By default, returns the catalog in Airbyte protocol format
679        (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing
680        to a connector's `--catalog` flag.
681
682        When `normalize` is `False`, returns the raw `syncCatalog` dict from the
683        Config API (camelCase keys, nested `config` block). This raw format can be
684        passed directly to `import_raw_catalog()` for backup/restore workflows.
685
686        Args:
687            normalize: If `True` (default), convert to Airbyte protocol format.
688                If `False`, return the raw Config API catalog.
689
690        Returns:
691            The configured catalog dict, or `None` if not found.
692        """
693        connection_response = api_util.get_connection_catalog(
694            connection_id=self.connection_id,
695            api_root=self.workspace.api_root,
696            client_id=self.workspace.client_id,
697            client_secret=self.workspace.client_secret,
698            bearer_token=self.workspace.bearer_token,
699            config_api_root=self.workspace.config_api_root,
700        )
701        raw = connection_response.get("syncCatalog")
702        if raw is None:
703            return None
704        if normalize:
705            return _normalize_catalog_to_protocol(raw)
706        return raw
707
708    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
709        """Replace the configured catalog for this connection.
710
711        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
712        > could result in broken connections, and/or incorrect sync behavior.
713
714        Accepts a configured catalog dict and replaces the connection's entire
715        catalog with it. All other connection settings remain unchanged.
716
717        Accepts either format:
718
719        - **Config API format** (`syncCatalog` with camelCase keys and nested `config`):
720          passed through directly.
721        - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys):
722          automatically converted to Config API format before sending.
723
724        Args:
725            catalog: The configured catalog dict in either format.
726        """
727        if _is_protocol_catalog_format(catalog):
728            catalog = _denormalize_catalog_to_api(catalog)
729
730        api_util.replace_connection_catalog(
731            connection_id=self.connection_id,
732            configured_catalog_dict=catalog,
733            api_root=self.workspace.api_root,
734            client_id=self.workspace.client_id,
735            client_secret=self.workspace.client_secret,
736            bearer_token=self.workspace.bearer_token,
737            config_api_root=self.workspace.config_api_root,
738        )
739
740    def rename(self, name: str) -> CloudConnection:
741        """Rename the connection.
742
743        Args:
744            name: New name for the connection
745
746        Returns:
747            Updated CloudConnection object with refreshed info
748        """
749        updated_response = api_util.patch_connection(
750            connection_id=self.connection_id,
751            api_root=self.workspace.api_root,
752            client_id=self.workspace.client_id,
753            client_secret=self.workspace.client_secret,
754            bearer_token=self.workspace.bearer_token,
755            name=name,
756        )
757        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
758        return self
759
760    def set_table_prefix(self, prefix: str) -> CloudConnection:
761        """Set the table prefix for the connection.
762
763        Args:
764            prefix: New table prefix to use when syncing to the destination
765
766        Returns:
767            Updated CloudConnection object with refreshed info
768        """
769        updated_response = api_util.patch_connection(
770            connection_id=self.connection_id,
771            api_root=self.workspace.api_root,
772            client_id=self.workspace.client_id,
773            client_secret=self.workspace.client_secret,
774            bearer_token=self.workspace.bearer_token,
775            prefix=prefix,
776        )
777        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
778        return self
779
780    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
781        """Set the selected streams for the connection.
782
783        This is a destructive operation that can break existing connections if the
784        stream selection is changed incorrectly. Use with caution.
785
786        Args:
787            stream_names: List of stream names to sync
788
789        Returns:
790            Updated CloudConnection object with refreshed info
791        """
792        configurations = api_util.build_stream_configurations(stream_names)
793
794        updated_response = api_util.patch_connection(
795            connection_id=self.connection_id,
796            api_root=self.workspace.api_root,
797            client_id=self.workspace.client_id,
798            client_secret=self.workspace.client_secret,
799            bearer_token=self.workspace.bearer_token,
800            configurations=configurations,
801        )
802        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
803        return self
804
805    # Enable/Disable
806
807    @property
808    def enabled(self) -> bool:
809        """Get the current enabled status of the connection.
810
811        This property always fetches fresh data from the API to ensure accuracy,
812        as another process or user may have toggled the setting.
813
814        Returns:
815            True if the connection status is 'active', False otherwise.
816        """
817        connection_info = self._fetch_connection_info(force_refresh=True)
818        return connection_info.status == "active"
819
820    @enabled.setter
821    def enabled(self, value: bool) -> None:
822        """Set the enabled status of the connection.
823
824        Args:
825            value: True to enable (set status to 'active'), False to disable
826                (set status to 'inactive').
827        """
828        self.set_enabled(enabled=value)
829
830    def set_enabled(
831        self,
832        *,
833        enabled: bool,
834        ignore_noop: bool = True,
835    ) -> None:
836        """Set the enabled status of the connection.
837
838        Args:
839            enabled: True to enable (set status to 'active'), False to disable
840                (set status to 'inactive').
841            ignore_noop: If True (default), silently return if the connection is already
842                in the requested state. If False, raise ValueError when the requested
843                state matches the current state.
844
845        Raises:
846            ValueError: If ignore_noop is False and the connection is already in the
847                requested state.
848        """
849        # Always fetch fresh data to check current status
850        connection_info = self._fetch_connection_info(force_refresh=True)
851        current_status = connection_info.status
852        desired_status = "active" if enabled else "inactive"
853
854        if current_status == desired_status:
855            if ignore_noop:
856                return
857            raise ValueError(
858                f"Connection is already {'enabled' if enabled else 'disabled'}. "
859                f"Current status: {current_status}"
860            )
861
862        updated_response = api_util.patch_connection(
863            connection_id=self.connection_id,
864            api_root=self.workspace.api_root,
865            client_id=self.workspace.client_id,
866            client_secret=self.workspace.client_secret,
867            bearer_token=self.workspace.bearer_token,
868            status=desired_status,
869        )
870        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
871
872    # Scheduling
873
874    def set_schedule(
875        self,
876        cron_expression: str,
877    ) -> None:
878        """Set a cron schedule for the connection.
879
880        Args:
881            cron_expression: A cron expression defining when syncs should run.
882
883        Examples:
884                - "0 0 * * *" - Daily at midnight UTC
885                - "0 */6 * * *" - Every 6 hours
886                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
887        """
888        updated_response = api_util.patch_connection(
889            connection_id=self.connection_id,
890            api_root=self.workspace.api_root,
891            client_id=self.workspace.client_id,
892            client_secret=self.workspace.client_secret,
893            bearer_token=self.workspace.bearer_token,
894            schedule=api_util.build_connection_schedule(
895                schedule_type="cron",
896                cron_expression=cron_expression,
897            ),
898        )
899        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
900
901    def set_manual_schedule(self) -> None:
902        """Set the connection to manual scheduling.
903
904        Disables automatic syncs. Syncs will only run when manually triggered.
905        """
906        updated_response = api_util.patch_connection(
907            connection_id=self.connection_id,
908            api_root=self.workspace.api_root,
909            client_id=self.workspace.client_id,
910            client_secret=self.workspace.client_secret,
911            bearer_token=self.workspace.bearer_token,
912            schedule=api_util.build_connection_schedule(schedule_type="manual"),
913        )
914        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
915
916    # Deletions
917
918    def permanently_delete(
919        self,
920        *,
921        cascade_delete_source: bool = False,
922        cascade_delete_destination: bool = False,
923    ) -> None:
924        """Delete the connection.
925
926        Args:
927            cascade_delete_source: Whether to also delete the source.
928            cascade_delete_destination: Whether to also delete the destination.
929        """
930        self.workspace.permanently_delete_connection(self)
931
932        if cascade_delete_source:
933            self.workspace.permanently_delete_source(self.source_id)
934
935        if cascade_delete_destination:
936            self.workspace.permanently_delete_destination(self.destination_id)

A connection is an extract-load (EL) pairing of a source and destination in Airbyte Cloud.

You can use a connection object to run sync jobs, retrieve logs, and manage the connection.

CloudConnection( workspace: CloudWorkspace, connection_id: str, source: str | None = None, destination: str | None = None)
50    def __init__(
51        self,
52        workspace: CloudWorkspace,
53        connection_id: str,
54        source: str | None = None,
55        destination: str | None = None,
56    ) -> None:
57        """It is not recommended to create a `CloudConnection` object directly.
58
59        Instead, use `CloudWorkspace.get_connection()` to create a connection object.
60        """
61        self.connection_id = connection_id
62        """The ID of the connection."""
63
64        self.workspace = workspace
65        """The workspace that the connection belongs to."""
66
67        self._source_id = source
68        """The ID of the source."""
69
70        self._destination_id = destination
71        """The ID of the destination."""
72
73        self._connection_info: CloudConnectionInfo | None = None
74        """The connection info object. (Cached.)"""
75
76        self._cloud_source_object: CloudSource | None = None
77        """The source object. (Cached.)"""
78
79        self._cloud_destination_object: CloudDestination | None = None
80        """The destination object. (Cached.)"""

It is not recommended to create a CloudConnection object directly.

Instead, use CloudWorkspace.get_connection() to create a connection object.

connection_id

The ID of the connection.

workspace

The workspace that the connection belongs to.

def check_is_valid(self) -> bool:
153    def check_is_valid(self) -> bool:
154        """Check if this connection exists and belongs to the expected workspace.
155
156        This method fetches connection info from the API (if not already cached) and
157        verifies that the connection's workspace_id matches the workspace associated
158        with this CloudConnection object.
159
160        Returns:
161            True if the connection exists and belongs to the expected workspace.
162
163        Raises:
164            AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
165            AirbyteMissingResourceError: If the connection doesn't exist.
166        """
167        self._fetch_connection_info(force_refresh=False, verify=True)
168        return True

Check if this connection exists and belongs to the expected workspace.

This method fetches connection info from the API (if not already cached) and verifies that the connection's workspace_id matches the workspace associated with this CloudConnection object.

Returns:

True if the connection exists and belongs to the expected workspace.

Raises:
  • AirbyteWorkspaceMismatchError: If the connection belongs to a different workspace.
  • AirbyteMissingResourceError: If the connection doesn't exist.
name: str | None
189    @property
190    def name(self) -> str | None:
191        """Get the display name of the connection, if available.
192
193        E.g. "My Postgres to Snowflake", not the connection ID.
194        """
195        if not self._connection_info:
196            self._connection_info = self._fetch_connection_info()
197
198        return self._connection_info.name

Get the display name of the connection, if available.

E.g. "My Postgres to Snowflake", not the connection ID.

source_id: str
200    @property
201    def source_id(self) -> str:
202        """The ID of the source."""
203        if not self._source_id:
204            if not self._connection_info:
205                self._connection_info = self._fetch_connection_info()
206
207            self._source_id = self._connection_info.source_id
208
209        return self._source_id

The ID of the source.

source: airbyte.cloud.connectors.CloudSource
211    @property
212    def source(self) -> CloudSource:
213        """Get the source object."""
214        if self._cloud_source_object:
215            return self._cloud_source_object
216
217        self._cloud_source_object = CloudSource(
218            workspace=self.workspace,
219            connector_id=self.source_id,
220        )
221        return self._cloud_source_object

Get the source object.

destination_id: str
223    @property
224    def destination_id(self) -> str:
225        """The ID of the destination."""
226        if not self._destination_id:
227            if not self._connection_info:
228                self._connection_info = self._fetch_connection_info()
229
230            self._destination_id = self._connection_info.destination_id
231
232        return self._destination_id

The ID of the destination.

destination: airbyte.cloud.connectors.CloudDestination
234    @property
235    def destination(self) -> CloudDestination:
236        """Get the destination object."""
237        if self._cloud_destination_object:
238            return self._cloud_destination_object
239
240        self._cloud_destination_object = CloudDestination(
241            workspace=self.workspace,
242            connector_id=self.destination_id,
243        )
244        return self._cloud_destination_object

Get the destination object.

stream_names: list[str]
246    @property
247    def stream_names(self) -> list[str]:
248        """The stream names."""
249        if not self._connection_info:
250            self._connection_info = self._fetch_connection_info()
251
252        return [stream.name for stream in self._connection_info.configurations.streams or []]

The stream names.

table_prefix: str
254    @property
255    def table_prefix(self) -> str:
256        """The table prefix."""
257        if not self._connection_info:
258            self._connection_info = self._fetch_connection_info()
259
260        return self._connection_info.prefix or ""

The table prefix.

connection_url: str | None
262    @property
263    def connection_url(self) -> str | None:
264        """The web URL to the connection."""
265        return f"{self.workspace.workspace_url}/connections/{self.connection_id}"

The web URL to the connection.

job_history_url: str | None
267    @property
268    def job_history_url(self) -> str | None:
269        """The URL to the job history for the connection."""
270        return f"{self.connection_url}/timeline"

The URL to the job history for the connection.

def run_sync( self, *, wait: bool = True, wait_timeout: int = 300) -> SyncResult:
274    def run_sync(
275        self,
276        *,
277        wait: bool = True,
278        wait_timeout: int = 300,
279    ) -> SyncResult:
280        """Run a sync."""
281        connection_response = api_util.run_connection(
282            connection_id=self.connection_id,
283            api_root=self.workspace.api_root,
284            workspace_id=self.workspace.workspace_id,
285            client_id=self.workspace.client_id,
286            client_secret=self.workspace.client_secret,
287            bearer_token=self.workspace.bearer_token,
288        )
289        sync_result = SyncResult(
290            workspace=self.workspace,
291            connection=self,
292            job_id=connection_response.job_id,
293        )
294
295        if wait:
296            sync_result.wait_for_completion(
297                wait_timeout=wait_timeout,
298                raise_failure=True,
299                raise_timeout=True,
300            )
301
302        return sync_result

Run a sync.

def get_previous_sync_logs( self, *, limit: int = 20, offset: int | None = None, from_tail: bool = True, job_type: str | JobTypeEnum | None = None) -> list[SyncResult]:
313    def get_previous_sync_logs(
314        self,
315        *,
316        limit: int = 20,
317        offset: int | None = None,
318        from_tail: bool = True,
319        job_type: str | JobTypeEnum | None = None,
320    ) -> list[SyncResult]:
321        """Get previous sync jobs for a connection with pagination support.
322
323        Returns SyncResult objects containing job metadata (job_id, status, bytes_synced,
324        rows_synced, start_time). Full log text can be fetched lazily via
325        `SyncResult.get_full_log_text()`.
326
327        Args:
328            limit: Maximum number of jobs to return. Defaults to 20.
329            offset: Number of jobs to skip from the beginning. Defaults to None (0).
330            from_tail: If True, returns jobs ordered newest-first (createdAt DESC).
331                If False, returns jobs ordered oldest-first (createdAt ASC).
332                Defaults to True.
333            job_type: Filter by job type (e.g., `sync`, `refresh`).
334                If not specified, defaults to sync and reset jobs only (API default behavior).
335
336        Returns:
337            A list of SyncResult objects representing the sync jobs.
338        """
339        order_by = (
340            api_util.JOB_ORDER_BY_CREATED_AT_DESC
341            if from_tail
342            else api_util.JOB_ORDER_BY_CREATED_AT_ASC
343        )
344        sync_logs = api_util.get_job_logs(
345            connection_id=self.connection_id,
346            api_root=self.workspace.api_root,
347            workspace_id=self.workspace.workspace_id,
348            limit=limit,
349            offset=offset,
350            order_by=order_by,
351            job_type=job_type,
352            client_id=self.workspace.client_id,
353            client_secret=self.workspace.client_secret,
354            bearer_token=self.workspace.bearer_token,
355        )
356        return [
357            SyncResult(
358                workspace=self.workspace,
359                connection=self,
360                job_id=sync_log.job_id,
361                _latest_job_info=CloudJobInfo.from_api_response(sync_log),
362            )
363            for sync_log in sync_logs
364        ]

Get previous sync jobs for a connection with pagination support.

Returns SyncResult objects containing job metadata (job_id, status, bytes_synced, rows_synced, start_time). Full log text can be fetched lazily via SyncResult.get_full_log_text().

Arguments:
  • limit: Maximum number of jobs to return. Defaults to 20.
  • offset: Number of jobs to skip from the beginning. Defaults to None (0).
  • from_tail: If True, returns jobs ordered newest-first (createdAt DESC). If False, returns jobs ordered oldest-first (createdAt ASC). Defaults to True.
  • job_type: Filter by job type (e.g., sync, refresh). If not specified, defaults to sync and reset jobs only (API default behavior).
Returns:

A list of SyncResult objects representing the sync jobs.

def get_sync_result( self, job_id: int | None = None) -> SyncResult | None:
366    def get_sync_result(
367        self,
368        job_id: int | None = None,
369    ) -> SyncResult | None:
370        """Get the sync result for the connection.
371
372        If `job_id` is not provided, the most recent sync job will be used.
373
374        Returns `None` if job_id is omitted and no previous jobs are found.
375        """
376        if job_id is None:
377            # Get the most recent sync job
378            results = self.get_previous_sync_logs(
379                limit=1,
380            )
381            if results:
382                return results[0]
383
384            return None
385
386        # Get the sync job by ID (lazy loaded)
387        return SyncResult(
388            workspace=self.workspace,
389            connection=self,
390            job_id=job_id,
391        )

Get the sync result for the connection.

If job_id is not provided, the most recent sync job will be used.

Returns None if job_id is omitted and no previous jobs are found.

@deprecated("Use 'dump_raw_state()' instead.")
def get_state_artifacts(self) -> list[dict[str, typing.Any]] | None:
395    @deprecated("Use 'dump_raw_state()' instead.")
396    def get_state_artifacts(self) -> list[dict[str, Any]] | None:
397        """Deprecated. Use `dump_raw_state()` instead."""
398        state_response = api_util.get_connection_state(
399            connection_id=self.connection_id,
400            api_root=self.workspace.api_root,
401            client_id=self.workspace.client_id,
402            client_secret=self.workspace.client_secret,
403            bearer_token=self.workspace.bearer_token,
404            config_api_root=self.workspace.config_api_root,
405        )
406        if state_response.get("stateType") == "not_set":
407            return None
408        return state_response.get("streamState", [])

Deprecated. Use dump_raw_state() instead.

def dump_raw_state( self, *, normalize: bool = True) -> dict[str, typing.Any] | list[dict[str, typing.Any]]:
416    def dump_raw_state(
417        self,
418        *,
419        normalize: bool = True,
420    ) -> dict[str, Any] | list[dict[str, Any]]:
421        """Dump the state for this connection.
422
423        By default, returns a list of Airbyte protocol `AirbyteStateMessage` dicts
424        with snake_case keys, suitable for passing to a connector's `--state` flag.
425
426        When `normalize` is `False`, returns the raw Config API dict (camelCase keys,
427        includes `stateType` and `connectionId`). This raw format can be passed
428        directly to `import_raw_state()` for backup/restore workflows.
429
430        Args:
431            normalize: If `True` (default), convert to Airbyte protocol format.
432                If `False`, return the raw Config API response.
433
434        Returns:
435            Normalized: list of protocol-format state message dicts (empty list if
436            no state). Raw: the full Config API state dict.
437        """
438        raw = api_util.get_connection_state(
439            connection_id=self.connection_id,
440            api_root=self.workspace.api_root,
441            client_id=self.workspace.client_id,
442            client_secret=self.workspace.client_secret,
443            bearer_token=self.workspace.bearer_token,
444            config_api_root=self.workspace.config_api_root,
445        )
446        if normalize:
447            return _normalize_state_to_protocol(raw)
448        return raw

Dump the state for this connection.

By default, returns a list of Airbyte protocol AirbyteStateMessage dicts with snake_case keys, suitable for passing to a connector's --state flag.

When normalize is False, returns the raw Config API dict (camelCase keys, includes stateType and connectionId). This raw format can be passed directly to import_raw_state() for backup/restore workflows.

Arguments:
  • normalize: If True (default), convert to Airbyte protocol format. If False, return the raw Config API response.
Returns:

Normalized: list of protocol-format state message dicts (empty list if no state). Raw: the full Config API state dict.

def import_raw_state( self, connection_state: dict[str, typing.Any] | list[dict[str, typing.Any]]) -> dict[str, typing.Any]:
450    def import_raw_state(
451        self,
452        connection_state: dict[str, Any] | list[dict[str, Any]],
453    ) -> dict[str, Any]:
454        """Import (restore) the full state for this connection.
455
456        > ⚠️ **WARNING:** Modifying the state directly is not recommended and
457        > could result in broken connections, and/or incorrect sync behavior.
458
459        Replaces the entire connection state with the provided state blob.
460        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
461
462        This is the counterpart to `dump_raw_state()` for backup/restore workflows.
463        The `connectionId` in the blob is always overridden with this connection's
464        ID, making state blobs portable across connections.
465
466        Accepts either format:
467
468        - **Config API format** (dict with `stateType`): passed through directly.
469        - **Airbyte protocol format** (list of `AirbyteStateMessage` dicts): automatically
470          converted to Config API format before sending.
471
472        Args:
473            connection_state: Connection state in either Config API or Airbyte protocol format.
474
475        Returns:
476            The updated connection state as a dictionary.
477
478        Raises:
479            AirbyteConnectionSyncActiveError: If a sync is currently running on this
480                connection (HTTP 423). Wait for the sync to complete before retrying.
481        """
482        api_state: dict[str, Any]
483        if isinstance(connection_state, list):
484            if not _is_protocol_state_format(connection_state):
485                msg = (
486                    "Expected connection_state list to contain Airbyte protocol state "
487                    "message dicts (each with a top-level `type` of STREAM, GLOBAL, "
488                    "or LEGACY). Got a list that does not match protocol format."
489                )
490                raise ValueError(msg)
491            api_state = _denormalize_protocol_state_to_api(
492                protocol_messages=connection_state,
493                connection_id=self.connection_id,
494            )
495        elif isinstance(connection_state, dict):
496            if _is_protocol_state_format(connection_state):
497                api_state = _denormalize_protocol_state_to_api(
498                    protocol_messages=[connection_state],
499                    connection_id=self.connection_id,
500                )
501            else:
502                api_state = connection_state
503        else:
504            msg = f"Expected a dict or list, got {type(connection_state)}"
505            raise TypeError(msg)
506
507        return api_util.replace_connection_state(
508            connection_id=self.connection_id,
509            connection_state_dict=api_state,
510            api_root=self.workspace.api_root,
511            client_id=self.workspace.client_id,
512            client_secret=self.workspace.client_secret,
513            bearer_token=self.workspace.bearer_token,
514            config_api_root=self.workspace.config_api_root,
515        )

Import (restore) the full state for this connection.

⚠️ WARNING: Modifying the state directly is not recommended and could result in broken connections, and/or incorrect sync behavior.

Replaces the entire connection state with the provided state blob. Uses the safe variant that prevents updates while a sync is running (HTTP 423).

This is the counterpart to dump_raw_state() for backup/restore workflows. The connectionId in the blob is always overridden with this connection's ID, making state blobs portable across connections.

Accepts either format:

  • Config API format (dict with stateType): passed through directly.
  • Airbyte protocol format (list of AirbyteStateMessage dicts): automatically converted to Config API format before sending.
Arguments:
  • connection_state: Connection state in either Config API or Airbyte protocol format.
Returns:

The updated connection state as a dictionary.

Raises:
  • AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
def get_stream_state( self, stream_name: str, stream_namespace: str | None = None) -> dict[str, typing.Any] | None:
517    def get_stream_state(
518        self,
519        stream_name: str,
520        stream_namespace: str | None = None,
521    ) -> dict[str, Any] | None:
522        """Get the state blob for a single stream within this connection.
523
524        Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}),
525        not the full connection state envelope.
526
527        This is compatible with `stream`-type state and stream-level entries
528        within a `global`-type state. It is not compatible with `legacy` state.
529        To get or set the entire connection-level state artifact, use
530        `dump_raw_state` and `import_raw_state` instead.
531
532        Args:
533            stream_name: The name of the stream to get state for.
534            stream_namespace: The source-side stream namespace. This refers to the
535                namespace from the source (e.g., database schema), not any destination
536                namespace override set in connection advanced settings.
537
538        Returns:
539            The stream's state blob as a dictionary, or None if the stream is not found.
540        """
541        state_data = self.dump_raw_state(normalize=False)
542        result = ConnectionStateResponse(**state_data)
543
544        streams = _get_stream_list(result)
545        matching = [s for s in streams if _match_stream(s, stream_name, stream_namespace)]
546
547        if not matching:
548            available = [s.stream_descriptor.name for s in streams]
549            logger.warning(
550                "Stream '%s' not found in connection state for connection '%s'. "
551                "Available streams: %s",
552                stream_name,
553                self.connection_id,
554                available,
555            )
556            return None
557
558        return matching[0].stream_state

Get the state blob for a single stream within this connection.

Returns just the stream's state dictionary (e.g., {"cursor": "2024-01-01"}), not the full connection state envelope.

This is compatible with stream-type state and stream-level entries within a global-type state. It is not compatible with legacy state. To get or set the entire connection-level state artifact, use dump_raw_state and import_raw_state instead.

Arguments:
  • stream_name: The name of the stream to get state for.
  • stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Returns:

The stream's state blob as a dictionary, or None if the stream is not found.

def set_stream_state( self, stream_name: str, state_blob_dict: dict[str, typing.Any], stream_namespace: str | None = None) -> None:
560    def set_stream_state(
561        self,
562        stream_name: str,
563        state_blob_dict: dict[str, Any],
564        stream_namespace: str | None = None,
565    ) -> None:
566        """Set the state for a single stream within this connection.
567
568        Fetches the current full state, replaces only the specified stream's state,
569        then sends the full updated state back to the API. If the stream does not
570        exist in the current state, it is appended.
571
572        This is compatible with `stream`-type state and stream-level entries
573        within a `global`-type state. It is not compatible with `legacy` state.
574        To get or set the entire connection-level state artifact, use
575        `dump_raw_state` and `import_raw_state` instead.
576
577        Uses the safe variant that prevents updates while a sync is running (HTTP 423).
578
579        Args:
580            stream_name: The name of the stream to update state for.
581            state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
582            stream_namespace: The source-side stream namespace. This refers to the
583                namespace from the source (e.g., database schema), not any destination
584                namespace override set in connection advanced settings.
585
586        Raises:
587            PyAirbyteInputError: If the connection state type is not supported for
588                stream-level operations (not_set, legacy).
589            AirbyteConnectionSyncActiveError: If a sync is currently running on this
590                connection (HTTP 423). Wait for the sync to complete before retrying.
591        """
592        state_data = self.dump_raw_state(normalize=False)
593        current = ConnectionStateResponse(**state_data)
594
595        if current.state_type == "not_set":
596            raise PyAirbyteInputError(
597                message="Cannot set stream state: connection has no existing state.",
598                context={"connection_id": self.connection_id},
599            )
600
601        if current.state_type == "legacy":
602            raise PyAirbyteInputError(
603                message="Cannot set stream state on a legacy-type connection state.",
604                context={"connection_id": self.connection_id},
605            )
606
607        new_stream_entry = {
608            "streamDescriptor": {
609                "name": stream_name,
610                **(
611                    {
612                        "namespace": stream_namespace,
613                    }
614                    if stream_namespace
615                    else {}
616                ),
617            },
618            "streamState": state_blob_dict,
619        }
620
621        raw_streams: list[dict[str, Any]]
622        if current.state_type == "stream":
623            raw_streams = state_data.get("streamState", [])
624        elif current.state_type == "global":
625            raw_streams = state_data.get("globalState", {}).get("streamStates", [])
626        else:
627            raw_streams = []
628
629        streams = _get_stream_list(current)
630        found = False
631        updated_streams_raw: list[dict[str, Any]] = []
632        for raw_s, parsed_s in zip(raw_streams, streams, strict=False):
633            if _match_stream(parsed_s, stream_name, stream_namespace):
634                updated_streams_raw.append(new_stream_entry)
635                found = True
636            else:
637                updated_streams_raw.append(raw_s)
638
639        if not found:
640            updated_streams_raw.append(new_stream_entry)
641
642        full_state: dict[str, Any] = {
643            **state_data,
644        }
645
646        if current.state_type == "stream":
647            full_state["streamState"] = updated_streams_raw
648        elif current.state_type == "global":
649            original_global = state_data.get("globalState", {})
650            full_state["globalState"] = {
651                **original_global,
652                "streamStates": updated_streams_raw,
653            }
654
655        self.import_raw_state(full_state)

Set the state for a single stream within this connection.

Fetches the current full state, replaces only the specified stream's state, then sends the full updated state back to the API. If the stream does not exist in the current state, it is appended.

This is compatible with stream-type state and stream-level entries within a global-type state. It is not compatible with legacy state. To get or set the entire connection-level state artifact, use dump_raw_state and import_raw_state instead.

Uses the safe variant that prevents updates while a sync is running (HTTP 423).

Arguments:
  • stream_name: The name of the stream to update state for.
  • state_blob_dict: The state blob dict for this stream (e.g., {"cursor": "2024-01-01"}).
  • stream_namespace: The source-side stream namespace. This refers to the namespace from the source (e.g., database schema), not any destination namespace override set in connection advanced settings.
Raises:
  • PyAirbyteInputError: If the connection state type is not supported for stream-level operations (not_set, legacy).
  • AirbyteConnectionSyncActiveError: If a sync is currently running on this connection (HTTP 423). Wait for the sync to complete before retrying.
@deprecated("Use 'dump_raw_catalog()' instead.")
def get_catalog_artifact(self) -> dict[str, typing.Any] | None:
657    @deprecated("Use 'dump_raw_catalog()' instead.")
658    def get_catalog_artifact(self) -> dict[str, Any] | None:
659        """Get the configured catalog for this connection.
660
661        Returns the full configured catalog (syncCatalog) for this connection,
662        including stream schemas, sync modes, cursor fields, and primary keys.
663
664        Uses the Config API endpoint: POST /v1/web_backend/connections/get
665
666        Returns:
667            Dictionary containing the configured catalog, or `None` if not found.
668        """
669        return self.dump_raw_catalog()

Get the configured catalog for this connection.

Returns the full configured catalog (syncCatalog) for this connection, including stream schemas, sync modes, cursor fields, and primary keys.

Uses the Config API endpoint: POST /v1/web_backend/connections/get

Returns:

Dictionary containing the configured catalog, or None if not found.

def dump_raw_catalog(self, *, normalize: bool = True) -> dict[str, typing.Any] | None:
671    def dump_raw_catalog(
672        self,
673        *,
674        normalize: bool = True,
675    ) -> dict[str, Any] | None:
676        """Dump the configured catalog for this connection.
677
678        By default, returns the catalog in Airbyte protocol format
679        (`ConfiguredAirbyteCatalog` with snake_case keys), suitable for passing
680        to a connector's `--catalog` flag.
681
682        When `normalize` is `False`, returns the raw `syncCatalog` dict from the
683        Config API (camelCase keys, nested `config` block). This raw format can be
684        passed directly to `import_raw_catalog()` for backup/restore workflows.
685
686        Args:
687            normalize: If `True` (default), convert to Airbyte protocol format.
688                If `False`, return the raw Config API catalog.
689
690        Returns:
691            The configured catalog dict, or `None` if not found.
692        """
693        connection_response = api_util.get_connection_catalog(
694            connection_id=self.connection_id,
695            api_root=self.workspace.api_root,
696            client_id=self.workspace.client_id,
697            client_secret=self.workspace.client_secret,
698            bearer_token=self.workspace.bearer_token,
699            config_api_root=self.workspace.config_api_root,
700        )
701        raw = connection_response.get("syncCatalog")
702        if raw is None:
703            return None
704        if normalize:
705            return _normalize_catalog_to_protocol(raw)
706        return raw

Dump the configured catalog for this connection.

By default, returns the catalog in Airbyte protocol format (ConfiguredAirbyteCatalog with snake_case keys), suitable for passing to a connector's --catalog flag.

When normalize is False, returns the raw syncCatalog dict from the Config API (camelCase keys, nested config block). This raw format can be passed directly to import_raw_catalog() for backup/restore workflows.

Arguments:
  • normalize: If True (default), convert to Airbyte protocol format. If False, return the raw Config API catalog.
Returns:

The configured catalog dict, or None if not found.

def import_raw_catalog(self, catalog: dict[str, typing.Any]) -> None:
708    def import_raw_catalog(self, catalog: dict[str, Any]) -> None:
709        """Replace the configured catalog for this connection.
710
711        > ⚠️ **WARNING:** Modifying the catalog directly is not recommended and
712        > could result in broken connections, and/or incorrect sync behavior.
713
714        Accepts a configured catalog dict and replaces the connection's entire
715        catalog with it. All other connection settings remain unchanged.
716
717        Accepts either format:
718
719        - **Config API format** (`syncCatalog` with camelCase keys and nested `config`):
720          passed through directly.
721        - **Airbyte protocol format** (`ConfiguredAirbyteCatalog` with snake_case keys):
722          automatically converted to Config API format before sending.
723
724        Args:
725            catalog: The configured catalog dict in either format.
726        """
727        if _is_protocol_catalog_format(catalog):
728            catalog = _denormalize_catalog_to_api(catalog)
729
730        api_util.replace_connection_catalog(
731            connection_id=self.connection_id,
732            configured_catalog_dict=catalog,
733            api_root=self.workspace.api_root,
734            client_id=self.workspace.client_id,
735            client_secret=self.workspace.client_secret,
736            bearer_token=self.workspace.bearer_token,
737            config_api_root=self.workspace.config_api_root,
738        )

Replace the configured catalog for this connection.

⚠️ WARNING: Modifying the catalog directly is not recommended and could result in broken connections, and/or incorrect sync behavior.

Accepts a configured catalog dict and replaces the connection's entire catalog with it. All other connection settings remain unchanged.

Accepts either format:

  • Config API format (syncCatalog with camelCase keys and nested config): passed through directly.
  • Airbyte protocol format (ConfiguredAirbyteCatalog with snake_case keys): automatically converted to Config API format before sending.
Arguments:
  • catalog: The configured catalog dict in either format.
def rename(self, name: str) -> CloudConnection:
740    def rename(self, name: str) -> CloudConnection:
741        """Rename the connection.
742
743        Args:
744            name: New name for the connection
745
746        Returns:
747            Updated CloudConnection object with refreshed info
748        """
749        updated_response = api_util.patch_connection(
750            connection_id=self.connection_id,
751            api_root=self.workspace.api_root,
752            client_id=self.workspace.client_id,
753            client_secret=self.workspace.client_secret,
754            bearer_token=self.workspace.bearer_token,
755            name=name,
756        )
757        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
758        return self

Rename the connection.

Arguments:
  • name: New name for the connection
Returns:

Updated CloudConnection object with refreshed info

def set_table_prefix(self, prefix: str) -> CloudConnection:
760    def set_table_prefix(self, prefix: str) -> CloudConnection:
761        """Set the table prefix for the connection.
762
763        Args:
764            prefix: New table prefix to use when syncing to the destination
765
766        Returns:
767            Updated CloudConnection object with refreshed info
768        """
769        updated_response = api_util.patch_connection(
770            connection_id=self.connection_id,
771            api_root=self.workspace.api_root,
772            client_id=self.workspace.client_id,
773            client_secret=self.workspace.client_secret,
774            bearer_token=self.workspace.bearer_token,
775            prefix=prefix,
776        )
777        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
778        return self

Set the table prefix for the connection.

Arguments:
  • prefix: New table prefix to use when syncing to the destination
Returns:

Updated CloudConnection object with refreshed info

def set_selected_streams( self, stream_names: list[str]) -> CloudConnection:
780    def set_selected_streams(self, stream_names: list[str]) -> CloudConnection:
781        """Set the selected streams for the connection.
782
783        This is a destructive operation that can break existing connections if the
784        stream selection is changed incorrectly. Use with caution.
785
786        Args:
787            stream_names: List of stream names to sync
788
789        Returns:
790            Updated CloudConnection object with refreshed info
791        """
792        configurations = api_util.build_stream_configurations(stream_names)
793
794        updated_response = api_util.patch_connection(
795            connection_id=self.connection_id,
796            api_root=self.workspace.api_root,
797            client_id=self.workspace.client_id,
798            client_secret=self.workspace.client_secret,
799            bearer_token=self.workspace.bearer_token,
800            configurations=configurations,
801        )
802        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)
803        return self

Set the selected streams for the connection.

This is a destructive operation that can break existing connections if the stream selection is changed incorrectly. Use with caution.

Arguments:
  • stream_names: List of stream names to sync
Returns:

Updated CloudConnection object with refreshed info

enabled: bool
807    @property
808    def enabled(self) -> bool:
809        """Get the current enabled status of the connection.
810
811        This property always fetches fresh data from the API to ensure accuracy,
812        as another process or user may have toggled the setting.
813
814        Returns:
815            True if the connection status is 'active', False otherwise.
816        """
817        connection_info = self._fetch_connection_info(force_refresh=True)
818        return connection_info.status == "active"

Get the current enabled status of the connection.

This property always fetches fresh data from the API to ensure accuracy, as another process or user may have toggled the setting.

Returns:

True if the connection status is 'active', False otherwise.

def set_enabled(self, *, enabled: bool, ignore_noop: bool = True) -> None:
830    def set_enabled(
831        self,
832        *,
833        enabled: bool,
834        ignore_noop: bool = True,
835    ) -> None:
836        """Set the enabled status of the connection.
837
838        Args:
839            enabled: True to enable (set status to 'active'), False to disable
840                (set status to 'inactive').
841            ignore_noop: If True (default), silently return if the connection is already
842                in the requested state. If False, raise ValueError when the requested
843                state matches the current state.
844
845        Raises:
846            ValueError: If ignore_noop is False and the connection is already in the
847                requested state.
848        """
849        # Always fetch fresh data to check current status
850        connection_info = self._fetch_connection_info(force_refresh=True)
851        current_status = connection_info.status
852        desired_status = "active" if enabled else "inactive"
853
854        if current_status == desired_status:
855            if ignore_noop:
856                return
857            raise ValueError(
858                f"Connection is already {'enabled' if enabled else 'disabled'}. "
859                f"Current status: {current_status}"
860            )
861
862        updated_response = api_util.patch_connection(
863            connection_id=self.connection_id,
864            api_root=self.workspace.api_root,
865            client_id=self.workspace.client_id,
866            client_secret=self.workspace.client_secret,
867            bearer_token=self.workspace.bearer_token,
868            status=desired_status,
869        )
870        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)

Set the enabled status of the connection.

Arguments:
  • enabled: True to enable (set status to 'active'), False to disable (set status to 'inactive').
  • ignore_noop: If True (default), silently return if the connection is already in the requested state. If False, raise ValueError when the requested state matches the current state.
Raises:
  • ValueError: If ignore_noop is False and the connection is already in the requested state.
def set_schedule(self, cron_expression: str) -> None:
874    def set_schedule(
875        self,
876        cron_expression: str,
877    ) -> None:
878        """Set a cron schedule for the connection.
879
880        Args:
881            cron_expression: A cron expression defining when syncs should run.
882
883        Examples:
884                - "0 0 * * *" - Daily at midnight UTC
885                - "0 */6 * * *" - Every 6 hours
886                - "0 0 * * 0" - Weekly on Sunday at midnight UTC
887        """
888        updated_response = api_util.patch_connection(
889            connection_id=self.connection_id,
890            api_root=self.workspace.api_root,
891            client_id=self.workspace.client_id,
892            client_secret=self.workspace.client_secret,
893            bearer_token=self.workspace.bearer_token,
894            schedule=api_util.build_connection_schedule(
895                schedule_type="cron",
896                cron_expression=cron_expression,
897            ),
898        )
899        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)

Set a cron schedule for the connection.

Arguments:
  • cron_expression: A cron expression defining when syncs should run.
Examples:
  • "0 0 * * *" - Daily at midnight UTC
  • "0 */6 * * *" - Every 6 hours
  • "0 0 * * 0" - Weekly on Sunday at midnight UTC
def set_manual_schedule(self) -> None:
901    def set_manual_schedule(self) -> None:
902        """Set the connection to manual scheduling.
903
904        Disables automatic syncs. Syncs will only run when manually triggered.
905        """
906        updated_response = api_util.patch_connection(
907            connection_id=self.connection_id,
908            api_root=self.workspace.api_root,
909            client_id=self.workspace.client_id,
910            client_secret=self.workspace.client_secret,
911            bearer_token=self.workspace.bearer_token,
912            schedule=api_util.build_connection_schedule(schedule_type="manual"),
913        )
914        self._connection_info = CloudConnectionInfo.from_api_response(updated_response)

Set the connection to manual scheduling.

Disables automatic syncs. Syncs will only run when manually triggered.

def permanently_delete( self, *, cascade_delete_source: bool = False, cascade_delete_destination: bool = False) -> None:
918    def permanently_delete(
919        self,
920        *,
921        cascade_delete_source: bool = False,
922        cascade_delete_destination: bool = False,
923    ) -> None:
924        """Delete the connection.
925
926        Args:
927            cascade_delete_source: Whether to also delete the source.
928            cascade_delete_destination: Whether to also delete the destination.
929        """
930        self.workspace.permanently_delete_connection(self)
931
932        if cascade_delete_source:
933            self.workspace.permanently_delete_source(self.source_id)
934
935        if cascade_delete_destination:
936            self.workspace.permanently_delete_destination(self.destination_id)

Delete the connection.

Arguments:
  • cascade_delete_source: Whether to also delete the source.
  • cascade_delete_destination: Whether to also delete the destination.
@dataclass
class CloudClientConfig:
 59@dataclass
 60class CloudClientConfig:
 61    """Client configuration for Airbyte Cloud API.
 62
 63    This class encapsulates the authentication and API configuration needed to connect
 64    to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually
 65    exclusive authentication methods:
 66
 67    1. OAuth2 client credentials flow (client_id + client_secret)
 68    2. Bearer token authentication
 69
 70    Exactly one authentication method must be provided. Providing both or neither
 71    will raise a validation error.
 72
 73    Attributes:
 74        client_id: OAuth2 client ID for client credentials flow.
 75        client_secret: OAuth2 client secret for client credentials flow.
 76        bearer_token: Pre-generated bearer token for direct authentication.
 77        api_root: The API root URL. Defaults to Airbyte Cloud API.
 78        config_api_root: The Config API root URL.
 79    """
 80
 81    client_id: SecretString | None = None
 82    """OAuth2 client ID for client credentials authentication."""
 83
 84    client_secret: SecretString | None = None
 85    """OAuth2 client secret for client credentials authentication."""
 86
 87    bearer_token: SecretString | None = None
 88    """Bearer token for direct authentication (alternative to client credentials)."""
 89
 90    api_root: str = api_util.CLOUD_API_ROOT
 91    """The API root URL. Defaults to Airbyte Cloud API."""
 92
 93    config_api_root: str | None = None
 94    """The Config API root URL."""
 95
 96    def __post_init__(self) -> None:
 97        """Validate credentials and ensure secrets are properly wrapped."""
 98        # Wrap secrets in SecretString if they aren't already
 99        if self.client_id is not None:
100            self.client_id = SecretString(self.client_id)
101        if self.client_secret is not None:
102            self.client_secret = SecretString(self.client_secret)
103        if self.bearer_token is not None:
104            self.bearer_token = SecretString(self.bearer_token)
105
106        # Validate mutual exclusivity
107        has_client_credentials = self.client_id is not None or self.client_secret is not None
108        has_bearer_token = self.bearer_token is not None
109
110        if has_client_credentials and has_bearer_token:
111            raise PyAirbyteInputError(
112                message="Cannot use both client credentials and bearer token authentication.",
113                guidance=(
114                    "Provide either client_id and client_secret together, "
115                    "or bearer_token alone, but not both."
116                ),
117            )
118
119        if has_client_credentials and (self.client_id is None or self.client_secret is None):
120            # If using client credentials, both must be provided
121            raise PyAirbyteInputError(
122                message="Incomplete client credentials.",
123                guidance=(
124                    "When using client credentials authentication, "
125                    "both client_id and client_secret must be provided."
126                ),
127            )
128
129        if not has_client_credentials and not has_bearer_token:
130            raise PyAirbyteInputError(
131                message="No authentication credentials provided.",
132                guidance=(
133                    "Provide either client_id and client_secret together for OAuth2 "
134                    "client credentials flow, or bearer_token for direct authentication."
135                ),
136            )
137
138    @property
139    def uses_bearer_token(self) -> bool:
140        """Return True if using bearer token authentication."""
141        return self.bearer_token is not None
142
143    @property
144    def uses_client_credentials(self) -> bool:
145        """Return True if using client credentials authentication."""
146        return self.client_id is not None and self.client_secret is not None
147
148    @classmethod
149    def from_env(
150        cls,
151        *,
152        api_root: str | None = None,
153        config_api_root: str | None = None,
154    ) -> CloudClientConfig:
155        """Create CloudClientConfig from environment variables.
156
157        This factory method resolves credentials from environment variables,
158        providing a convenient way to create credentials without explicitly
159        passing secrets.
160
161        Environment variables used:
162            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
163            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
164            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
165            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
166            - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL.
167
168        The method will first check for a bearer token. If not found, it will
169        attempt to use client credentials.
170
171        Args:
172            api_root: The API root URL. If not provided, will be resolved from
173                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
174                the Airbyte Cloud API.
175            config_api_root: The Config API root URL. If not provided, will be resolved
176                from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable.
177
178        Returns:
179            A CloudClientConfig instance configured with credentials from the environment.
180
181        Raises:
182            PyAirbyteSecretNotFoundError: If required credentials are not found in
183                the environment.
184        """
185        resolved_api_root = resolve_cloud_api_url(api_root)
186        resolved_config_api_root = resolve_cloud_config_api_url(config_api_root)
187
188        # Try bearer token first
189        bearer_token = resolve_cloud_bearer_token()
190        if bearer_token:
191            return cls(
192                bearer_token=bearer_token,
193                api_root=resolved_api_root,
194                config_api_root=resolved_config_api_root,
195            )
196
197        # Fall back to client credentials
198        return cls(
199            client_id=resolve_cloud_client_id(),
200            client_secret=resolve_cloud_client_secret(),
201            api_root=resolved_api_root,
202            config_api_root=resolved_config_api_root,
203        )

Client configuration for Airbyte Cloud API.

This class encapsulates the authentication and API configuration needed to connect to Airbyte Cloud, OSS, or Enterprise instances. It supports two mutually exclusive authentication methods:

  1. OAuth2 client credentials flow (client_id + client_secret)
  2. Bearer token authentication

Exactly one authentication method must be provided. Providing both or neither will raise a validation error.

Attributes:
  • client_id: OAuth2 client ID for client credentials flow.
  • client_secret: OAuth2 client secret for client credentials flow.
  • bearer_token: Pre-generated bearer token for direct authentication.
  • api_root: The API root URL. Defaults to Airbyte Cloud API.
  • config_api_root: The Config API root URL.
CloudClientConfig( client_id: airbyte.secrets.SecretString | None = None, client_secret: airbyte.secrets.SecretString | None = None, bearer_token: airbyte.secrets.SecretString | None = None, api_root: str = 'https://api.airbyte.com/v1', config_api_root: str | None = None)
client_id: airbyte.secrets.SecretString | None = None

OAuth2 client ID for client credentials authentication.

client_secret: airbyte.secrets.SecretString | None = None

OAuth2 client secret for client credentials authentication.

bearer_token: airbyte.secrets.SecretString | None = None

Bearer token for direct authentication (alternative to client credentials).

api_root: str = 'https://api.airbyte.com/v1'

The API root URL. Defaults to Airbyte Cloud API.

config_api_root: str | None = None

The Config API root URL.

uses_bearer_token: bool
138    @property
139    def uses_bearer_token(self) -> bool:
140        """Return True if using bearer token authentication."""
141        return self.bearer_token is not None

Return True if using bearer token authentication.

uses_client_credentials: bool
143    @property
144    def uses_client_credentials(self) -> bool:
145        """Return True if using client credentials authentication."""
146        return self.client_id is not None and self.client_secret is not None

Return True if using client credentials authentication.

@classmethod
def from_env( cls, *, api_root: str | None = None, config_api_root: str | None = None) -> CloudClientConfig:
148    @classmethod
149    def from_env(
150        cls,
151        *,
152        api_root: str | None = None,
153        config_api_root: str | None = None,
154    ) -> CloudClientConfig:
155        """Create CloudClientConfig from environment variables.
156
157        This factory method resolves credentials from environment variables,
158        providing a convenient way to create credentials without explicitly
159        passing secrets.
160
161        Environment variables used:
162            - `AIRBYTE_CLOUD_CLIENT_ID`: OAuth client ID (for client credentials flow).
163            - `AIRBYTE_CLOUD_CLIENT_SECRET`: OAuth client secret (for client credentials flow).
164            - `AIRBYTE_CLOUD_BEARER_TOKEN`: Bearer token (alternative to client credentials).
165            - `AIRBYTE_CLOUD_API_URL`: Optional. The API root URL (defaults to Airbyte Cloud).
166            - `AIRBYTE_CLOUD_CONFIG_API_URL`: Optional. The Config API root URL.
167
168        The method will first check for a bearer token. If not found, it will
169        attempt to use client credentials.
170
171        Args:
172            api_root: The API root URL. If not provided, will be resolved from
173                the `AIRBYTE_CLOUD_API_URL` environment variable, or default to
174                the Airbyte Cloud API.
175            config_api_root: The Config API root URL. If not provided, will be resolved
176                from the `AIRBYTE_CLOUD_CONFIG_API_URL` environment variable.
177
178        Returns:
179            A CloudClientConfig instance configured with credentials from the environment.
180
181        Raises:
182            PyAirbyteSecretNotFoundError: If required credentials are not found in
183                the environment.
184        """
185        resolved_api_root = resolve_cloud_api_url(api_root)
186        resolved_config_api_root = resolve_cloud_config_api_url(config_api_root)
187
188        # Try bearer token first
189        bearer_token = resolve_cloud_bearer_token()
190        if bearer_token:
191            return cls(
192                bearer_token=bearer_token,
193                api_root=resolved_api_root,
194                config_api_root=resolved_config_api_root,
195            )
196
197        # Fall back to client credentials
198        return cls(
199            client_id=resolve_cloud_client_id(),
200            client_secret=resolve_cloud_client_secret(),
201            api_root=resolved_api_root,
202            config_api_root=resolved_config_api_root,
203        )

Create CloudClientConfig from environment variables.

This factory method resolves credentials from environment variables, providing a convenient way to create credentials without explicitly passing secrets.

Environment variables used:
  • AIRBYTE_CLOUD_CLIENT_ID: OAuth client ID (for client credentials flow).
  • AIRBYTE_CLOUD_CLIENT_SECRET: OAuth client secret (for client credentials flow).
  • AIRBYTE_CLOUD_BEARER_TOKEN: Bearer token (alternative to client credentials).
  • AIRBYTE_CLOUD_API_URL: Optional. The API root URL (defaults to Airbyte Cloud).
  • AIRBYTE_CLOUD_CONFIG_API_URL: Optional. The Config API root URL.

The method will first check for a bearer token. If not found, it will attempt to use client credentials.

Arguments:
  • api_root: The API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_API_URL environment variable, or default to the Airbyte Cloud API.
  • config_api_root: The Config API root URL. If not provided, will be resolved from the AIRBYTE_CLOUD_CONFIG_API_URL environment variable.
Returns:

A CloudClientConfig instance configured with credentials from the environment.

Raises:
  • PyAirbyteSecretNotFoundError: If required credentials are not found in the environment.
class CloudWorkspaceInfo(pydantic.main.BaseModel):
 78class CloudWorkspaceInfo(BaseModel):
 79    """Information about an Airbyte workspace."""
 80
 81    model_config = ConfigDict(populate_by_name=True)
 82
 83    workspace_id: str = Field(alias="workspaceId")
 84    """The workspace ID."""
 85
 86    name: str
 87    """The workspace name."""
 88
 89    data_residency: str | None = Field(default=None, alias="dataResidency")
 90    """The data residency setting for the workspace, if available."""
 91
 92    organization_id: str | None = Field(default=None, alias="organizationId")
 93    """The organization ID for the workspace, if available."""
 94
 95    notifications: dict[str, object | None] = Field(default_factory=dict)
 96    """Workspace notification settings."""
 97
 98    @classmethod
 99    def from_api_response(cls, workspace: _WorkspaceResponseLike) -> CloudWorkspaceInfo:
100        """Create a public model from an internal API workspace response."""
101        return cls(
102            workspace_id=workspace.workspace_id,
103            name=workspace.name,
104            data_residency=workspace.data_residency,
105            organization_id=getattr(workspace, "organization_id", None),
106            notifications=_notifications_to_dict(workspace.notifications),
107        )
108
109    @classmethod
110    def from_mapping(cls, workspace: Mapping[str, object]) -> CloudWorkspaceInfo:
111        """Create a public model from a workspace mapping."""
112        return cls.model_validate(workspace)
113
114    def to_dict(self) -> dict[str, object]:
115        """Return a JSON-serializable dictionary."""
116        return self.model_dump(mode="json")

Information about an Airbyte workspace.

workspace_id: str = PydanticUndefined

The workspace ID.

name: str = PydanticUndefined

The workspace name.

data_residency: str | None = None

The data residency setting for the workspace, if available.

organization_id: str | None = None

The organization ID for the workspace, if available.

notifications: dict[str, object | None] = PydanticUndefined

Workspace notification settings.

@classmethod
def from_api_response( cls, workspace: airbyte.cloud.models._WorkspaceResponseLike) -> CloudWorkspaceInfo:
 98    @classmethod
 99    def from_api_response(cls, workspace: _WorkspaceResponseLike) -> CloudWorkspaceInfo:
100        """Create a public model from an internal API workspace response."""
101        return cls(
102            workspace_id=workspace.workspace_id,
103            name=workspace.name,
104            data_residency=workspace.data_residency,
105            organization_id=getattr(workspace, "organization_id", None),
106            notifications=_notifications_to_dict(workspace.notifications),
107        )

Create a public model from an internal API workspace response.

@classmethod
def from_mapping( cls, workspace: Mapping[str, object]) -> CloudWorkspaceInfo:
109    @classmethod
110    def from_mapping(cls, workspace: Mapping[str, object]) -> CloudWorkspaceInfo:
111        """Create a public model from a workspace mapping."""
112        return cls.model_validate(workspace)

Create a public model from a workspace mapping.

def to_dict(self) -> dict[str, object]:
114    def to_dict(self) -> dict[str, object]:
115        """Return a JSON-serializable dictionary."""
116        return self.model_dump(mode="json")

Return a JSON-serializable dictionary.

@dataclass
class SyncResult:
218@dataclass
219class SyncResult:
220    """The result of a sync operation.
221
222    **This class is not meant to be instantiated directly.** Instead, obtain a `SyncResult` by
223    interacting with the `.CloudWorkspace` and `.CloudConnection` objects.
224    """
225
226    workspace: CloudWorkspace
227    connection: CloudConnection
228    job_id: int
229    table_name_prefix: str = ""
230    table_name_suffix: str = ""
231    _latest_job_info: CloudJobInfo | None = None
232    _connection_response: CloudConnectionInfo | None = None
233    _cache: CacheBase | None = None
234    _job_with_attempts_info: dict[str, Any] | None = None
235
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"
247
248    def _get_connection_info(self, *, force_refresh: bool = False) -> CloudConnectionInfo:
249        """Return connection info for the sync job."""
250        if self._connection_response and not force_refresh:
251            return self._connection_response
252
253        self._connection_response = CloudConnectionInfo.from_api_response(
254            api_util.get_connection(
255                workspace_id=self.workspace.workspace_id,
256                api_root=self.workspace.api_root,
257                connection_id=self.connection.connection_id,
258                client_id=self.workspace.client_id,
259                client_secret=self.workspace.client_secret,
260                bearer_token=self.workspace.bearer_token,
261            )
262        )
263        return self._connection_response
264
265    def _get_destination_configuration(self, *, force_refresh: bool = False) -> dict[str, Any]:
266        """Return the destination configuration for the sync job."""
267        connection_info = self._get_connection_info(force_refresh=force_refresh)
268        destination_response = api_util.get_destination(
269            destination_id=connection_info.destination_id,
270            api_root=self.workspace.api_root,
271            client_id=self.workspace.client_id,
272            client_secret=self.workspace.client_secret,
273            bearer_token=self.workspace.bearer_token,
274        )
275        return asdict(destination_response.configuration)
276
277    def is_job_complete(self) -> bool:
278        """Check if the sync job is complete."""
279        return self.get_job_status() in FINAL_STATUSES
280
281    def get_job_status(self) -> JobStatusEnum:
282        """Check if the sync job is still running."""
283        return self._fetch_latest_job_info().status
284
285    def _fetch_latest_job_info(self) -> CloudJobInfo:
286        """Return the job info for the sync job."""
287        if self._latest_job_info and self._latest_job_info.status in FINAL_STATUSES:
288            return self._latest_job_info
289
290        self._latest_job_info = CloudJobInfo.from_api_response(
291            api_util.get_job_info(
292                job_id=self.job_id,
293                api_root=self.workspace.api_root,
294                client_id=self.workspace.client_id,
295                client_secret=self.workspace.client_secret,
296                bearer_token=self.workspace.bearer_token,
297            )
298        )
299        return self._latest_job_info
300
301    @property
302    def bytes_synced(self) -> int:
303        """Return the number of records processed."""
304        return self._fetch_latest_job_info().bytes_synced or 0
305
306    @property
307    def records_synced(self) -> int:
308        """Return the number of records processed."""
309        return self._fetch_latest_job_info().rows_synced or 0
310
311    @property
312    def start_time(self) -> datetime:
313        """Return the start time of the sync job in UTC."""
314        try:
315            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
316        except (ValueError, TypeError) as e:
317            if "Invalid isoformat string" in str(e):
318                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
319                    api_root=self.workspace.api_root,
320                    config_api_root=self.workspace.config_api_root,
321                    path="/jobs/get",
322                    json={"id": self.job_id},
323                    client_id=self.workspace.client_id,
324                    client_secret=self.workspace.client_secret,
325                    bearer_token=self.workspace.bearer_token,
326                )
327                raw_start_time = job_info_raw.get("startTime")
328                if raw_start_time:
329                    return ab_datetime_parse(raw_start_time)
330            raise
331
332    def _fetch_job_with_attempts(self) -> dict[str, Any]:
333        """Fetch job info with attempts from Config API using lazy loading pattern."""
334        if self._job_with_attempts_info is not None:
335            return self._job_with_attempts_info
336
337        self._job_with_attempts_info = api_util._make_config_api_request(  # noqa: SLF001  # Config API helper
338            api_root=self.workspace.api_root,
339            config_api_root=self.workspace.config_api_root,
340            path="/jobs/get",
341            json={
342                "id": self.job_id,
343            },
344            client_id=self.workspace.client_id,
345            client_secret=self.workspace.client_secret,
346            bearer_token=self.workspace.bearer_token,
347        )
348        return self._job_with_attempts_info
349
350    def get_attempts(self) -> list[SyncAttempt]:
351        """Return a list of attempts for this sync job."""
352        job_with_attempts = self._fetch_job_with_attempts()
353        attempts_data = job_with_attempts.get("attempts", [])
354
355        return [
356            SyncAttempt(
357                workspace=self.workspace,
358                connection=self.connection,
359                job_id=self.job_id,
360                attempt_number=i,
361                _attempt_data=attempt_data,
362            )
363            for i, attempt_data in enumerate(attempts_data, start=0)
364        ]
365
366    def raise_failure_status(
367        self,
368        *,
369        refresh_status: bool = False,
370    ) -> None:
371        """Raise an exception if the sync job failed.
372
373        By default, this method will use the latest status available. If you want to refresh the
374        status before checking for failure, set `refresh_status=True`. If the job has failed, this
375        method will raise a `AirbyteConnectionSyncError`.
376
377        Otherwise, do nothing.
378        """
379        if not refresh_status and self._latest_job_info:
380            latest_status = self._latest_job_info.status
381        else:
382            latest_status = self.get_job_status()
383
384        if latest_status in FAILED_STATUSES:
385            raise AirbyteConnectionSyncError(
386                workspace=self.workspace,
387                connection_id=self.connection.connection_id,
388                job_id=self.job_id,
389                job_status=self.get_job_status(),
390            )
391
392    def wait_for_completion(
393        self,
394        *,
395        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
396        raise_timeout: bool = True,
397        raise_failure: bool = False,
398    ) -> JobStatusEnum:
399        """Wait for a job to finish running."""
400        start_time = time.time()
401        while True:
402            latest_status = self.get_job_status()
403            if latest_status in FINAL_STATUSES:
404                if raise_failure:
405                    # No-op if the job succeeded or is still running:
406                    self.raise_failure_status()
407
408                return latest_status
409
410            if time.time() - start_time > wait_timeout:
411                if raise_timeout:
412                    raise AirbyteConnectionSyncTimeoutError(
413                        workspace=self.workspace,
414                        connection_id=self.connection.connection_id,
415                        job_id=self.job_id,
416                        job_status=latest_status,
417                        timeout=wait_timeout,
418                    )
419
420                return latest_status  # This will be a non-final status
421
422            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)
423
424    def get_sql_cache(self) -> CacheBase:
425        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
426        if self._cache:
427            return self._cache
428
429        destination_configuration = self._get_destination_configuration()
430        self._cache = destination_to_cache(destination_configuration=destination_configuration)
431        return self._cache
432
433    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
434        """Return a SQL Engine for querying a SQL-based destination."""
435        return self.get_sql_cache().get_sql_engine()
436
437    def get_sql_table_name(self, stream_name: str) -> str:
438        """Return the SQL table name of the named stream."""
439        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)
440
441    def get_sql_table(
442        self,
443        stream_name: str,
444    ) -> sqlalchemy.Table:
445        """Return a SQLAlchemy table object for the named stream."""
446        return self.get_sql_cache().processor.get_sql_table(stream_name)
447
448    def get_dataset(self, stream_name: str) -> CachedDataset:
449        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
450
451        This can be used to read and analyze the data in a SQL-based destination.
452
453        TODO: In a future iteration, we can consider providing stream configuration information
454              (catalog information) to the `CachedDataset` object via the "Get stream properties"
455              API: https://reference.airbyte.com/reference/getstreamproperties
456        """
457        return CachedDataset(
458            self.get_sql_cache(),
459            stream_name=stream_name,
460            stream_configuration=False,  # Don't look for stream configuration in cache.
461        )
462
463    def get_sql_database_name(self) -> str:
464        """Return the SQL database name."""
465        cache = self.get_sql_cache()
466        return cache.get_database_name()
467
468    def get_sql_schema_name(self) -> str:
469        """Return the SQL schema name."""
470        cache = self.get_sql_cache()
471        return cache.schema_name
472
473    @property
474    def stream_names(self) -> list[str]:
475        """Return the set of stream names."""
476        return self.connection.stream_names
477
478    @final
479    @property
480    def streams(
481        self,
482    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
483        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
484
485        This is a convenience wrapper around the `stream_names`
486        property and `get_dataset()` method.
487        """
488        return self._SyncResultStreams(self)
489
490    class _SyncResultStreams(Mapping[str, CachedDataset]):
491        """A mapping of stream names to cached datasets."""
492
493        def __init__(
494            self,
495            parent: SyncResult,
496            /,
497        ) -> None:
498            self.parent: SyncResult = parent
499
500        def __getitem__(self, key: str) -> CachedDataset:
501            return self.parent.get_dataset(stream_name=key)
502
503        def __iter__(self) -> Iterator[str]:
504            return iter(self.parent.stream_names)
505
506        def __len__(self) -> int:
507            return len(self.parent.stream_names)

The result of a sync operation.

This class is not meant to be instantiated directly. Instead, obtain a SyncResult by interacting with the .CloudWorkspace and .CloudConnection objects.

SyncResult( workspace: CloudWorkspace, connection: CloudConnection, job_id: int, table_name_prefix: str = '', table_name_suffix: str = '', _latest_job_info: airbyte.cloud.models.CloudJobInfo | None = None, _connection_response: airbyte.cloud.models.CloudConnectionInfo | None = None, _cache: airbyte.caches.CacheBase | None = None, _job_with_attempts_info: dict[str, typing.Any] | None = None)
workspace: CloudWorkspace
connection: CloudConnection
job_id: int
table_name_prefix: str = ''
table_name_suffix: str = ''
job_url: str
236    @property
237    def job_url(self) -> str:
238        """Return the URL of the sync job.
239
240        Note: This currently returns the connection's job history URL, as there is no direct URL
241        to a specific job in the Airbyte Cloud web app.
242
243        TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number.
244              E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true
245        """
246        return f"{self.connection.job_history_url}"

Return the URL of the sync job.

Note: This currently returns the connection's job history URL, as there is no direct URL to a specific job in the Airbyte Cloud web app.

TODO: Implement a direct job logs URL on top of the event-id of the specific attempt number. E.g. {self.connection.job_history_url}?eventId={event-guid}&openLogs=true

def is_job_complete(self) -> bool:
277    def is_job_complete(self) -> bool:
278        """Check if the sync job is complete."""
279        return self.get_job_status() in FINAL_STATUSES

Check if the sync job is complete.

def get_job_status(self) -> JobStatusEnum:
281    def get_job_status(self) -> JobStatusEnum:
282        """Check if the sync job is still running."""
283        return self._fetch_latest_job_info().status

Check if the sync job is still running.

bytes_synced: int
301    @property
302    def bytes_synced(self) -> int:
303        """Return the number of records processed."""
304        return self._fetch_latest_job_info().bytes_synced or 0

Return the number of records processed.

records_synced: int
306    @property
307    def records_synced(self) -> int:
308        """Return the number of records processed."""
309        return self._fetch_latest_job_info().rows_synced or 0

Return the number of records processed.

start_time: datetime.datetime
311    @property
312    def start_time(self) -> datetime:
313        """Return the start time of the sync job in UTC."""
314        try:
315            return ab_datetime_parse(self._fetch_latest_job_info().start_time)
316        except (ValueError, TypeError) as e:
317            if "Invalid isoformat string" in str(e):
318                job_info_raw = api_util._make_config_api_request(  # noqa: SLF001
319                    api_root=self.workspace.api_root,
320                    config_api_root=self.workspace.config_api_root,
321                    path="/jobs/get",
322                    json={"id": self.job_id},
323                    client_id=self.workspace.client_id,
324                    client_secret=self.workspace.client_secret,
325                    bearer_token=self.workspace.bearer_token,
326                )
327                raw_start_time = job_info_raw.get("startTime")
328                if raw_start_time:
329                    return ab_datetime_parse(raw_start_time)
330            raise

Return the start time of the sync job in UTC.

def get_attempts(self) -> list[airbyte.cloud.sync_results.SyncAttempt]:
350    def get_attempts(self) -> list[SyncAttempt]:
351        """Return a list of attempts for this sync job."""
352        job_with_attempts = self._fetch_job_with_attempts()
353        attempts_data = job_with_attempts.get("attempts", [])
354
355        return [
356            SyncAttempt(
357                workspace=self.workspace,
358                connection=self.connection,
359                job_id=self.job_id,
360                attempt_number=i,
361                _attempt_data=attempt_data,
362            )
363            for i, attempt_data in enumerate(attempts_data, start=0)
364        ]

Return a list of attempts for this sync job.

def raise_failure_status(self, *, refresh_status: bool = False) -> None:
366    def raise_failure_status(
367        self,
368        *,
369        refresh_status: bool = False,
370    ) -> None:
371        """Raise an exception if the sync job failed.
372
373        By default, this method will use the latest status available. If you want to refresh the
374        status before checking for failure, set `refresh_status=True`. If the job has failed, this
375        method will raise a `AirbyteConnectionSyncError`.
376
377        Otherwise, do nothing.
378        """
379        if not refresh_status and self._latest_job_info:
380            latest_status = self._latest_job_info.status
381        else:
382            latest_status = self.get_job_status()
383
384        if latest_status in FAILED_STATUSES:
385            raise AirbyteConnectionSyncError(
386                workspace=self.workspace,
387                connection_id=self.connection.connection_id,
388                job_id=self.job_id,
389                job_status=self.get_job_status(),
390            )

Raise an exception if the sync job failed.

By default, this method will use the latest status available. If you want to refresh the status before checking for failure, set refresh_status=True. If the job has failed, this method will raise a AirbyteConnectionSyncError.

Otherwise, do nothing.

def wait_for_completion( self, *, wait_timeout: int = 1800, raise_timeout: bool = True, raise_failure: bool = False) -> JobStatusEnum:
392    def wait_for_completion(
393        self,
394        *,
395        wait_timeout: int = DEFAULT_SYNC_TIMEOUT_SECONDS,
396        raise_timeout: bool = True,
397        raise_failure: bool = False,
398    ) -> JobStatusEnum:
399        """Wait for a job to finish running."""
400        start_time = time.time()
401        while True:
402            latest_status = self.get_job_status()
403            if latest_status in FINAL_STATUSES:
404                if raise_failure:
405                    # No-op if the job succeeded or is still running:
406                    self.raise_failure_status()
407
408                return latest_status
409
410            if time.time() - start_time > wait_timeout:
411                if raise_timeout:
412                    raise AirbyteConnectionSyncTimeoutError(
413                        workspace=self.workspace,
414                        connection_id=self.connection.connection_id,
415                        job_id=self.job_id,
416                        job_status=latest_status,
417                        timeout=wait_timeout,
418                    )
419
420                return latest_status  # This will be a non-final status
421
422            time.sleep(api_util.JOB_WAIT_INTERVAL_SECS)

Wait for a job to finish running.

def get_sql_cache(self) -> airbyte.caches.CacheBase:
424    def get_sql_cache(self) -> CacheBase:
425        """Return a SQL Cache object for working with the data in a SQL-based destination's."""
426        if self._cache:
427            return self._cache
428
429        destination_configuration = self._get_destination_configuration()
430        self._cache = destination_to_cache(destination_configuration=destination_configuration)
431        return self._cache

Return a SQL Cache object for working with the data in a SQL-based destination's.

def get_sql_engine(self) -> sqlalchemy.engine.base.Engine:
433    def get_sql_engine(self) -> sqlalchemy.engine.Engine:
434        """Return a SQL Engine for querying a SQL-based destination."""
435        return self.get_sql_cache().get_sql_engine()

Return a SQL Engine for querying a SQL-based destination.

def get_sql_table_name(self, stream_name: str) -> str:
437    def get_sql_table_name(self, stream_name: str) -> str:
438        """Return the SQL table name of the named stream."""
439        return self.get_sql_cache().processor.get_sql_table_name(stream_name=stream_name)

Return the SQL table name of the named stream.

def get_sql_table(self, stream_name: str) -> sqlalchemy.sql.schema.Table:
441    def get_sql_table(
442        self,
443        stream_name: str,
444    ) -> sqlalchemy.Table:
445        """Return a SQLAlchemy table object for the named stream."""
446        return self.get_sql_cache().processor.get_sql_table(stream_name)

Return a SQLAlchemy table object for the named stream.

def get_dataset(self, stream_name: str) -> airbyte.CachedDataset:
448    def get_dataset(self, stream_name: str) -> CachedDataset:
449        """Retrieve an `airbyte.datasets.CachedDataset` object for a given stream name.
450
451        This can be used to read and analyze the data in a SQL-based destination.
452
453        TODO: In a future iteration, we can consider providing stream configuration information
454              (catalog information) to the `CachedDataset` object via the "Get stream properties"
455              API: https://reference.airbyte.com/reference/getstreamproperties
456        """
457        return CachedDataset(
458            self.get_sql_cache(),
459            stream_name=stream_name,
460            stream_configuration=False,  # Don't look for stream configuration in cache.
461        )

Retrieve an airbyte.datasets.CachedDataset object for a given stream name.

This can be used to read and analyze the data in a SQL-based destination.

TODO: In a future iteration, we can consider providing stream configuration information (catalog information) to the CachedDataset object via the "Get stream properties" API: https://reference.airbyte.com/reference/getstreamproperties

def get_sql_database_name(self) -> str:
463    def get_sql_database_name(self) -> str:
464        """Return the SQL database name."""
465        cache = self.get_sql_cache()
466        return cache.get_database_name()

Return the SQL database name.

def get_sql_schema_name(self) -> str:
468    def get_sql_schema_name(self) -> str:
469        """Return the SQL schema name."""
470        cache = self.get_sql_cache()
471        return cache.schema_name

Return the SQL schema name.

stream_names: list[str]
473    @property
474    def stream_names(self) -> list[str]:
475        """Return the set of stream names."""
476        return self.connection.stream_names

Return the set of stream names.

streams: airbyte.cloud.sync_results.SyncResult._SyncResultStreams
478    @final
479    @property
480    def streams(
481        self,
482    ) -> _SyncResultStreams:  # pyrefly: ignore[unknown-name]
483        """Return a mapping of stream names to `airbyte.CachedDataset` objects.
484
485        This is a convenience wrapper around the `stream_names`
486        property and `get_dataset()` method.
487        """
488        return self._SyncResultStreams(self)

Return a mapping of stream names to airbyte.CachedDataset objects.

This is a convenience wrapper around the stream_names property and get_dataset() method.

class JobStatusEnum(builtins.str, enum.Enum):
58class JobStatusEnum(str, Enum):
59    """Status values for an Airbyte Cloud job."""
60
61    PENDING = "pending"
62    RUNNING = "running"
63    INCOMPLETE = "incomplete"
64    FAILED = "failed"
65    SUCCEEDED = "succeeded"
66    CANCELLED = "cancelled"

Status values for an Airbyte Cloud job.

PENDING = <JobStatusEnum.PENDING: 'pending'>
RUNNING = <JobStatusEnum.RUNNING: 'running'>
INCOMPLETE = <JobStatusEnum.INCOMPLETE: 'incomplete'>
FAILED = <JobStatusEnum.FAILED: 'failed'>
SUCCEEDED = <JobStatusEnum.SUCCEEDED: 'succeeded'>
CANCELLED = <JobStatusEnum.CANCELLED: 'cancelled'>
class JobTypeEnum(builtins.str, enum.Enum):
69class JobTypeEnum(str, Enum):
70    """Job type values for Airbyte Cloud jobs."""
71
72    SYNC = "sync"
73    RESET = "reset"
74    REFRESH = "refresh"
75    CLEAR = "clear"

Job type values for Airbyte Cloud jobs.

SYNC = <JobTypeEnum.SYNC: 'sync'>
RESET = <JobTypeEnum.RESET: 'reset'>
REFRESH = <JobTypeEnum.REFRESH: 'refresh'>
CLEAR = <JobTypeEnum.CLEAR: 'clear'>