airbyte.secrets

Secrets management for PyAirbyte.

PyAirbyte provides a secrets management system that allows you to securely store and retrieve sensitive information. This module provides the secrets functionality.

Secrets Management

PyAirbyte can auto-import secrets from the following sources:

  1. Environment variables.
  2. Variables defined in a local .env ("Dotenv") file.
  3. Google Colab secrets.
  4. Manual entry via getpass.

Note: You can also build your own secret manager by subclassing the CustomSecretManager implementation. For more information, see the airbyte.secrets.CustomSecretManager reference docs.

Retrieving Secrets

To retrieve a secret, use the get_secret() function. For example:

import airbyte as ab

source = ab.get_source("source-github")
source.set_config(
   "credentials": {
      "personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"),
   }
)

By default, PyAirbyte will search all available secrets sources. The get_secret() function also accepts an optional sources argument of specific source names (SecretSourceEnum) and/or secret manager objects to check.

By default, PyAirbyte will prompt the user for any requested secrets that are not provided via other secret managers. You can disable this prompt by passing allow_prompt=False to get_secret().

Secrets Auto-Discovery

If you have a secret matching an expected name, PyAirbyte will automatically use it. For example, if you have a secret named GITHUB_PERSONAL_ACCESS_TOKEN, PyAirbyte will automatically use it when configuring the GitHub source.

The naming convention for secrets is as {CONNECTOR_NAME}_{PROPERTY_NAME}, for instance SNOWFLAKE_PASSWORD and BIGQUERY_CREDENTIALS_PATH.

PyAirbyte will also auto-discover secrets for interop with hosted Airbyte: AIRBYTE_CLOUD_API_URL, AIRBYTE_CLOUD_API_KEY, etc.

Custom Secret Managers

If you need to build your own secret manager, you can subclass the airbyte.secrets.CustomSecretManager class. This allows you to build a custom secret manager that can be used with the get_secret() function, securely storing and retrieving secrets as needed.

API Reference

_Below are the classes and functions available in the airbyte.secrets module._

  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2"""Secrets management for PyAirbyte.
  3
  4PyAirbyte provides a secrets management system that allows you to securely store and retrieve
  5sensitive information. This module provides the secrets functionality.
  6
  7## Secrets Management
  8
  9PyAirbyte can auto-import secrets from the following sources:
 10
 111. Environment variables.
 122. Variables defined in a local `.env` ("Dotenv") file.
 133. [Google Colab secrets](https://medium.com/@parthdasawant/how-to-use-secrets-in-google-colab-450c38e3ec75).
 144. Manual entry via [`getpass`](https://docs.python.org/3.10/library/getpass.html).
 15
 16**Note:** You can also build your own secret manager by subclassing the `CustomSecretManager`
 17implementation. For more information, see the `airbyte.secrets.CustomSecretManager` reference docs.
 18
 19### Retrieving Secrets
 20
 21To retrieve a secret, use the `get_secret()` function. For example:
 22
 23```python
 24import airbyte as ab
 25
 26source = ab.get_source("source-github")
 27source.set_config(
 28   "credentials": {
 29      "personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"),
 30   }
 31)
 32```
 33
 34By default, PyAirbyte will search all available secrets sources. The `get_secret()` function also
 35accepts an optional `sources` argument of specific source names (`SecretSourceEnum`) and/or secret
 36manager objects to check.
 37
 38By default, PyAirbyte will prompt the user for any requested secrets that are not provided via other
 39secret managers. You can disable this prompt by passing `allow_prompt=False` to `get_secret()`.
 40
 41### Secrets Auto-Discovery
 42
 43If you have a secret matching an expected name, PyAirbyte will automatically use it. For example, if
 44you have a secret named `GITHUB_PERSONAL_ACCESS_TOKEN`, PyAirbyte will automatically use it when
 45configuring the GitHub source.
 46
 47The naming convention for secrets is as `{CONNECTOR_NAME}_{PROPERTY_NAME}`, for instance
 48`SNOWFLAKE_PASSWORD` and `BIGQUERY_CREDENTIALS_PATH`.
 49
 50PyAirbyte will also auto-discover secrets for interop with hosted Airbyte: `AIRBYTE_CLOUD_API_URL`,
 51`AIRBYTE_CLOUD_API_KEY`, etc.
 52
 53## Custom Secret Managers
 54
 55If you need to build your own secret manager, you can subclass the
 56`airbyte.secrets.CustomSecretManager` class. This allows you to build a custom secret manager that
 57can be used with the `get_secret()` function, securely storing and retrieving secrets as needed.
 58
 59## API Reference
 60
 61_Below are the classes and functions available in the `airbyte.secrets` module._
 62
 63"""
 64
 65from __future__ import annotations
 66
 67from airbyte.secrets import (
 68    base,
 69    config,
 70    custom,
 71    env_vars,
 72    google_colab,
 73    google_gsm,
 74    prompt,
 75    util,
 76)
 77from airbyte.secrets.base import SecretHandle, SecretManager, SecretSourceEnum, SecretString
 78from airbyte.secrets.config import disable_secret_source, register_secret_manager
 79from airbyte.secrets.custom import CustomSecretManager
 80from airbyte.secrets.env_vars import DotenvSecretManager, EnvVarSecretManager
 81from airbyte.secrets.google_colab import ColabSecretManager
 82from airbyte.secrets.google_gsm import GoogleGSMSecretManager
 83from airbyte.secrets.prompt import SecretsPrompt
 84from airbyte.secrets.util import get_secret
 85
 86
 87__all__ = [
 88    # Submodules
 89    "base",
 90    "config",
 91    "custom",
 92    "env_vars",
 93    "google_colab",
 94    "google_gsm",
 95    "prompt",
 96    "util",
 97    # Secret Access
 98    "get_secret",
 99    # Secret Classes
100    "SecretSourceEnum",
101    "SecretString",
102    "SecretHandle",
103    # Secret Managers
104    "SecretManager",
105    "EnvVarSecretManager",
106    "DotenvSecretManager",
107    "ColabSecretManager",
108    "SecretsPrompt",
109    "CustomSecretManager",
110    "GoogleGSMSecretManager",
111    # Registration Functions`
112    "register_secret_manager",
113    "disable_secret_source",
114]
def get_secret( secret_name: str, /, *, sources: list[SecretManager | SecretSourceEnum] | None = None, allow_prompt: bool = True, **kwargs: dict[str, typing.Any]) -> SecretString:
15def get_secret(
16    secret_name: str,
17    /,
18    *,
19    sources: list[SecretManager | SecretSourceEnum] | None = None,
20    allow_prompt: bool = True,
21    **kwargs: dict[str, Any],
22) -> SecretString:
23    """Get a secret from the environment.
24
25    The optional `sources` argument of enum type `SecretSourceEnum` or list of `SecretSourceEnum`
26    options. If left blank, all available sources will be checked. If a list of `SecretSourceEnum`
27    entries is passed, then the sources will be checked using the provided ordering.
28
29    If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then
30    the user will be prompted to enter the secret if it is not found in any of the other sources.
31    """
32    if "source" in kwargs:
33        warnings.warn(
34            message="The `source` argument is deprecated. Use the `sources` argument instead.",
35            category=DeprecationWarning,
36            stacklevel=2,
37        )
38        sources = kwargs.pop("source")  # type: ignore [assignment]
39
40    available_sources: dict[str, SecretManager] = {}
41    for available_source in _get_secret_sources():
42        # Add available sources to the dict. Order matters.
43        available_sources[available_source.name] = available_source
44
45    if sources is None:
46        # If ANY is in the list, then we don't need to check any other sources.
47        # This is the default behavior.
48        sources = list(available_sources.values())
49
50    elif not isinstance(sources, list):
51        sources = [sources]  # type: ignore [unreachable]  # This is a 'just in case' catch.
52
53    # Replace any SecretSourceEnum strings with the matching SecretManager object
54    for source in list(sources):
55        if isinstance(source, SecretSourceEnum):
56            if source not in available_sources:
57                raise exc.PyAirbyteInputError(
58                    guidance="Invalid secret source name.",
59                    input_value=source,
60                    context={
61                        "Available Sources": list(available_sources.keys()),
62                    },
63                )
64
65            sources[sources.index(source)] = available_sources[source]
66
67    secret_managers = cast(list[SecretManager], sources)
68
69    if SecretSourceEnum.PROMPT in secret_managers:
70        prompt_source = secret_managers.pop(
71            # Mis-typed, but okay here since we have equality logic for the enum comparison:
72            secret_managers.index(SecretSourceEnum.PROMPT),  # type: ignore [arg-type]
73        )
74
75        if allow_prompt:
76            # Always check prompt last. Add it to the end of the list.
77            secret_managers.append(prompt_source)
78
79    for secret_mgr in secret_managers:
80        val = secret_mgr.get_secret(secret_name)
81        if val:
82            return SecretString(val)
83
84    raise exc.PyAirbyteSecretNotFoundError(
85        secret_name=secret_name,
86        sources=[str(s) for s in available_sources],
87    )

Get a secret from the environment.

The optional sources argument of enum type SecretSourceEnum or list of SecretSourceEnum options. If left blank, all available sources will be checked. If a list of SecretSourceEnum entries is passed, then the sources will be checked using the provided ordering.

If allow_prompt is True or if SecretSourceEnum.PROMPT is declared in the source arg, then the user will be prompted to enter the secret if it is not found in any of the other sources.

class SecretSourceEnum(builtins.str, enum.Enum):
24class SecretSourceEnum(str, Enum):
25    """Enumeration of secret sources supported by PyAirbyte."""
26
27    ENV = "env"
28    DOTENV = "dotenv"
29    GOOGLE_COLAB = "google_colab"
30    GOOGLE_GSM = "google_gsm"  # Not enabled by default
31
32    PROMPT = "prompt"

Enumeration of secret sources supported by PyAirbyte.

ENV = <SecretSourceEnum.ENV: 'env'>
DOTENV = <SecretSourceEnum.DOTENV: 'dotenv'>
GOOGLE_COLAB = <SecretSourceEnum.GOOGLE_COLAB: 'google_colab'>
GOOGLE_GSM = <SecretSourceEnum.GOOGLE_GSM: 'google_gsm'>
PROMPT = <SecretSourceEnum.PROMPT: 'prompt'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class SecretString(builtins.str):
 35class SecretString(str):
 36    """A string that represents a secret.
 37
 38    This class is used to mark a string as a secret. When a secret is printed, it
 39    will be masked to prevent accidental exposure of sensitive information when debugging
 40    or when printing containing objects like dictionaries.
 41
 42    To create a secret string, simply instantiate the class with any string value:
 43
 44        ```python
 45        secret = SecretString("my_secret_password")
 46        ```
 47
 48    """
 49
 50    __slots__ = ()
 51
 52    def __repr__(self) -> str:
 53        """Override the representation of the secret string to return a masked value.
 54
 55        The secret string is always masked with `****` to prevent accidental exposure, unless
 56        explicitly converted to a string. For instance, printing a config dictionary that contains
 57        a secret will automatically mask the secret value instead of printing it in plain text.
 58
 59        However, if you explicitly convert the cast the secret as a string, such as when used
 60        in an f-string, the secret will be exposed. This is the desired behavior to allow
 61        secrets to be used in a controlled manner.
 62        """
 63        return "<SecretString: ****>"
 64
 65    def is_empty(self) -> bool:
 66        """Check if the secret is an empty string."""
 67        return len(self) == 0
 68
 69    def is_json(self) -> bool:
 70        """Check if the secret string is a valid JSON string."""
 71        try:
 72            json.loads(self)
 73        except (json.JSONDecodeError, Exception):
 74            return False
 75
 76        return True
 77
 78    def __bool__(self) -> bool:
 79        """Override the boolean value of the secret string.
 80
 81        Always returns `True` without inspecting contents.
 82        """
 83        return True
 84
 85    def parse_json(self) -> dict:
 86        """Parse the secret string as JSON."""
 87        try:
 88            return json.loads(self)
 89        except json.JSONDecodeError as ex:
 90            raise exc.PyAirbyteInputError(
 91                message="Failed to parse secret as JSON.",
 92                context={
 93                    "Message": ex.msg,
 94                    "Position": ex.pos,
 95                    "SecretString_Length": len(self),  # Debug secret blank or an unexpected format.
 96                },
 97            ) from None
 98
 99    # Pydantic compatibility
100
101    @classmethod
102    def validate(
103        cls,
104        v: Any,  # noqa: ANN401  # Must allow `Any` to match Pydantic signature
105        info: ValidationInfo,
106    ) -> SecretString:
107        """Validate the input value is valid as a secret string."""
108        _ = info  # Unused
109        if not isinstance(v, str):
110            raise exc.PyAirbyteInputError(
111                message="A valid `str` or `SecretString` object is required.",
112            )
113        return cls(v)
114
115    @classmethod
116    def __get_pydantic_core_schema__(  # noqa: PLW3201  # Pydantic dunder
117        cls,
118        source_type: Any,  # noqa: ANN401  # Must allow `Any` to match Pydantic signature
119        handler: GetCoreSchemaHandler,
120    ) -> CoreSchema:
121        """Return a modified core schema for the secret string."""
122        return core_schema.with_info_after_validator_function(
123            function=cls.validate, schema=handler(str), field_name=handler.field_name
124        )
125
126    @classmethod
127    def __get_pydantic_json_schema__(  # noqa: PLW3201  # Pydantic dunder method
128        cls, _core_schema: core_schema.CoreSchema, handler: GetJsonSchemaHandler
129    ) -> JsonSchemaValue:
130        """Return a modified JSON schema for the secret string.
131
132        - `writeOnly=True` is the official way to prevent secrets from being exposed inadvertently.
133        - `Format=password` is a popular and readable convention to indicate the field is sensitive.
134        """
135        _ = _core_schema, handler  # Unused
136        return {
137            "type": "string",
138            "format": "password",
139            "writeOnly": True,
140        }

A string that represents a secret.

This class is used to mark a string as a secret. When a secret is printed, it will be masked to prevent accidental exposure of sensitive information when debugging or when printing containing objects like dictionaries.

To create a secret string, simply instantiate the class with any string value:

secret = SecretString("my_secret_password")
def is_empty(self) -> bool:
65    def is_empty(self) -> bool:
66        """Check if the secret is an empty string."""
67        return len(self) == 0

Check if the secret is an empty string.

def is_json(self) -> bool:
69    def is_json(self) -> bool:
70        """Check if the secret string is a valid JSON string."""
71        try:
72            json.loads(self)
73        except (json.JSONDecodeError, Exception):
74            return False
75
76        return True

Check if the secret string is a valid JSON string.

def parse_json(self) -> dict:
85    def parse_json(self) -> dict:
86        """Parse the secret string as JSON."""
87        try:
88            return json.loads(self)
89        except json.JSONDecodeError as ex:
90            raise exc.PyAirbyteInputError(
91                message="Failed to parse secret as JSON.",
92                context={
93                    "Message": ex.msg,
94                    "Position": ex.pos,
95                    "SecretString_Length": len(self),  # Debug secret blank or an unexpected format.
96                },
97            ) from None

Parse the secret string as JSON.

@classmethod
def validate( cls, v: Any, info: pydantic_core.core_schema.ValidationInfo) -> SecretString:
101    @classmethod
102    def validate(
103        cls,
104        v: Any,  # noqa: ANN401  # Must allow `Any` to match Pydantic signature
105        info: ValidationInfo,
106    ) -> SecretString:
107        """Validate the input value is valid as a secret string."""
108        _ = info  # Unused
109        if not isinstance(v, str):
110            raise exc.PyAirbyteInputError(
111                message="A valid `str` or `SecretString` object is required.",
112            )
113        return cls(v)

Validate the input value is valid as a secret string.

Inherited Members
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class SecretHandle:
208class SecretHandle:
209    """A handle for a secret in a secret manager.
210
211    This class is used to store a reference to a secret in a secret manager.
212    The secret is not retrieved until the `get_value()` or `parse_json()` methods are
213    called.
214    """
215
216    def __init__(
217        self,
218        parent: SecretManager,
219        secret_name: str,
220    ) -> None:
221        """Instantiate a new secret handle."""
222        self.parent = parent
223        self.secret_name = secret_name
224
225    def get_value(self) -> SecretString:
226        """Get the secret from the secret manager.
227
228        Subclasses can optionally override this method to provide a more optimized code path.
229        """
230        return cast(SecretString, self.parent.get_secret(self.secret_name))
231
232    def parse_json(self) -> dict:
233        """Parse the secret as JSON.
234
235        This method is a convenience method to parse the secret as JSON without
236        needing to call `get_value()` first. If the secret is not a valid JSON
237        string, a `PyAirbyteInputError` will be raised.
238        """
239        return self.get_value().parse_json()
240
241    def write_to_file(
242        self,
243        file_path: Path,
244        /,
245        *,
246        silent: bool = False,
247    ) -> None:
248        """Write the secret to a file.
249
250        If `silent` is True, the method will not print any output to the console. Otherwise,
251        the method will print a message to the console indicating the file path to which the secret
252        is being written.
253
254        This method is a convenience method that writes the secret to a file as text.
255        """
256        if not silent:
257            print(
258                f"Writing secret '{self.secret_name.split('/')[-1]}' to '{file_path.absolute()!s}'"
259            )
260
261        file_path.write_text(
262            str(self.get_value()),
263            encoding="utf-8",
264        )

A handle for a secret in a secret manager.

This class is used to store a reference to a secret in a secret manager. The secret is not retrieved until the get_value() or parse_json() methods are called.

SecretHandle(parent: SecretManager, secret_name: str)
216    def __init__(
217        self,
218        parent: SecretManager,
219        secret_name: str,
220    ) -> None:
221        """Instantiate a new secret handle."""
222        self.parent = parent
223        self.secret_name = secret_name

Instantiate a new secret handle.

parent
secret_name
def get_value(self) -> SecretString:
225    def get_value(self) -> SecretString:
226        """Get the secret from the secret manager.
227
228        Subclasses can optionally override this method to provide a more optimized code path.
229        """
230        return cast(SecretString, self.parent.get_secret(self.secret_name))

Get the secret from the secret manager.

Subclasses can optionally override this method to provide a more optimized code path.

def parse_json(self) -> dict:
232    def parse_json(self) -> dict:
233        """Parse the secret as JSON.
234
235        This method is a convenience method to parse the secret as JSON without
236        needing to call `get_value()` first. If the secret is not a valid JSON
237        string, a `PyAirbyteInputError` will be raised.
238        """
239        return self.get_value().parse_json()

Parse the secret as JSON.

This method is a convenience method to parse the secret as JSON without needing to call get_value() first. If the secret is not a valid JSON string, a PyAirbyteInputError will be raised.

def write_to_file(self, file_path: pathlib.Path, /, *, silent: bool = False) -> None:
241    def write_to_file(
242        self,
243        file_path: Path,
244        /,
245        *,
246        silent: bool = False,
247    ) -> None:
248        """Write the secret to a file.
249
250        If `silent` is True, the method will not print any output to the console. Otherwise,
251        the method will print a message to the console indicating the file path to which the secret
252        is being written.
253
254        This method is a convenience method that writes the secret to a file as text.
255        """
256        if not silent:
257            print(
258                f"Writing secret '{self.secret_name.split('/')[-1]}' to '{file_path.absolute()!s}'"
259            )
260
261        file_path.write_text(
262            str(self.get_value()),
263            encoding="utf-8",
264        )

Write the secret to a file.

If silent is True, the method will not print any output to the console. Otherwise, the method will print a message to the console indicating the file path to which the secret is being written.

This method is a convenience method that writes the secret to a file as text.

class SecretManager(abc.ABC):
143class SecretManager(ABC):
144    """Abstract base class for secret managers.
145
146    Secret managers are used to retrieve secrets from a secret store.
147
148    By registering a secret manager, PyAirbyte can automatically locate and
149    retrieve secrets from the secret store when needed. This allows you to
150    securely store and access sensitive information such as API keys, passwords,
151    and other credentials without hardcoding them in your code.
152
153    To create a custom secret manager, subclass this class and implement the
154    `get_secret` method. By default, the secret manager will be automatically
155    registered as a global secret source, but will not replace any existing
156    secret sources. To customize this behavior, override the `auto_register` and
157    `replace_existing` attributes in your subclass as needed.
158
159    Note: Registered secrets managers always have priority over the default
160    secret sources such as environment variables, dotenv files, and Google Colab
161    secrets. If multiple secret managers are registered, the last one registered
162    will take priority.
163    """
164
165    replace_existing = False
166    as_backup = False
167
168    def __init__(self) -> None:
169        """Instantiate the new secret manager."""
170        if not hasattr(self, "name"):
171            # Default to the class name if no name is provided
172            self.name: str = self.__class__.__name__
173
174    @abstractmethod
175    def get_secret(self, secret_name: str) -> SecretString | None:
176        """Get a named secret from the secret manager.
177
178        This method should be implemented by subclasses to retrieve secrets from
179        the secret store. If the secret is not found, the method should return `None`.
180        """
181        ...
182
183    def __str__(self) -> str:
184        """Return the name of the secret manager."""
185        return self.name
186
187    def __eq__(self, value: object) -> bool:
188        """Check if the secret manager is equal to another secret manager."""
189        if isinstance(value, SecretManager):
190            return self.name == value.name
191
192        if isinstance(value, str):
193            return self.name == value
194
195        if isinstance(value, SecretSourceEnum):
196            return self.name == str(value)
197
198        return super().__eq__(value)
199
200    def __hash__(self) -> int:
201        """Return a hash of the secret manager name.
202
203        This allows the secret manager to be used in sets and dictionaries.
204        """
205        return hash(self.name)

Abstract base class for secret managers.

Secret managers are used to retrieve secrets from a secret store.

By registering a secret manager, PyAirbyte can automatically locate and retrieve secrets from the secret store when needed. This allows you to securely store and access sensitive information such as API keys, passwords, and other credentials without hardcoding them in your code.

To create a custom secret manager, subclass this class and implement the get_secret method. By default, the secret manager will be automatically registered as a global secret source, but will not replace any existing secret sources. To customize this behavior, override the auto_register and replace_existing attributes in your subclass as needed.

Note: Registered secrets managers always have priority over the default secret sources such as environment variables, dotenv files, and Google Colab secrets. If multiple secret managers are registered, the last one registered will take priority.

SecretManager()
168    def __init__(self) -> None:
169        """Instantiate the new secret manager."""
170        if not hasattr(self, "name"):
171            # Default to the class name if no name is provided
172            self.name: str = self.__class__.__name__

Instantiate the new secret manager.

replace_existing = False
as_backup = False
@abstractmethod
def get_secret(self, secret_name: str) -> SecretString | None:
174    @abstractmethod
175    def get_secret(self, secret_name: str) -> SecretString | None:
176        """Get a named secret from the secret manager.
177
178        This method should be implemented by subclasses to retrieve secrets from
179        the secret store. If the secret is not found, the method should return `None`.
180        """
181        ...

Get a named secret from the secret manager.

This method should be implemented by subclasses to retrieve secrets from the secret store. If the secret is not found, the method should return None.

class EnvVarSecretManager(airbyte.secrets.SecretManager):
14class EnvVarSecretManager(SecretManager):
15    """Secret manager that retrieves secrets from environment variables."""
16
17    name = SecretSourceEnum.ENV.value
18
19    def get_secret(self, secret_name: str) -> SecretString | None:
20        """Get a named secret from the environment."""
21        if secret_name not in os.environ:
22            return None
23
24        return SecretString(os.environ[secret_name])

Secret manager that retrieves secrets from environment variables.

name = 'env'
def get_secret(self, secret_name: str) -> SecretString | None:
19    def get_secret(self, secret_name: str) -> SecretString | None:
20        """Get a named secret from the environment."""
21        if secret_name not in os.environ:
22            return None
23
24        return SecretString(os.environ[secret_name])

Get a named secret from the environment.

class DotenvSecretManager(airbyte.secrets.SecretManager):
27class DotenvSecretManager(SecretManager):
28    """Secret manager that retrieves secrets from a `.env` file."""
29
30    name = SecretSourceEnum.DOTENV.value
31
32    def get_secret(self, secret_name: str) -> SecretString | None:
33        """Get a named secret from the `.env` file."""
34        try:
35            dotenv_vars: dict[str, str | None] = dotenv_values()
36        except Exception:
37            # Can't locate or parse a .env file
38            return None
39
40        if secret_name not in dotenv_vars:
41            # Secret not found
42            return None
43
44        return SecretString(dotenv_vars[secret_name])

Secret manager that retrieves secrets from a .env file.

name = 'dotenv'
def get_secret(self, secret_name: str) -> SecretString | None:
32    def get_secret(self, secret_name: str) -> SecretString | None:
33        """Get a named secret from the `.env` file."""
34        try:
35            dotenv_vars: dict[str, str | None] = dotenv_values()
36        except Exception:
37            # Can't locate or parse a .env file
38            return None
39
40        if secret_name not in dotenv_vars:
41            # Secret not found
42            return None
43
44        return SecretString(dotenv_vars[secret_name])

Get a named secret from the .env file.

class ColabSecretManager(airbyte.secrets.SecretManager):
10class ColabSecretManager(SecretManager):
11    """Secret manager that retrieves secrets from Google Colab user secrets."""
12
13    name = SecretSourceEnum.GOOGLE_COLAB.value
14
15    def __init__(self) -> None:
16        """Initialize the Google Colab secret manager."""
17        try:
18            from google.colab import (  # pyright: ignore[reportMissingImports]  # noqa: PLC0415
19                userdata as colab_userdata,
20            )
21
22            self.colab_userdata = colab_userdata
23        except ImportError:
24            self.colab_userdata = None
25
26        super().__init__()
27
28    def get_secret(self, secret_name: str) -> SecretString | None:
29        """Get a named secret from Google Colab user secrets."""
30        if self.colab_userdata is None:
31            # The module doesn't exist. We probably aren't in Colab.
32            return None
33
34        try:
35            return SecretString(self.colab_userdata.get(secret_name))
36        except Exception:
37            # Secret name not found. Continue.
38            return None

Secret manager that retrieves secrets from Google Colab user secrets.

ColabSecretManager()
15    def __init__(self) -> None:
16        """Initialize the Google Colab secret manager."""
17        try:
18            from google.colab import (  # pyright: ignore[reportMissingImports]  # noqa: PLC0415
19                userdata as colab_userdata,
20            )
21
22            self.colab_userdata = colab_userdata
23        except ImportError:
24            self.colab_userdata = None
25
26        super().__init__()

Initialize the Google Colab secret manager.

name = 'google_colab'
def get_secret(self, secret_name: str) -> SecretString | None:
28    def get_secret(self, secret_name: str) -> SecretString | None:
29        """Get a named secret from Google Colab user secrets."""
30        if self.colab_userdata is None:
31            # The module doesn't exist. We probably aren't in Colab.
32            return None
33
34        try:
35            return SecretString(self.colab_userdata.get(secret_name))
36        except Exception:
37            # Secret name not found. Continue.
38            return None

Get a named secret from Google Colab user secrets.

class SecretsPrompt(airbyte.secrets.SecretManager):
13class SecretsPrompt(SecretManager):
14    """Secret manager that prompts the user to enter a secret."""
15
16    name = SecretSourceEnum.PROMPT.value
17
18    def get_secret(
19        self,
20        secret_name: str,
21    ) -> SecretString | None:
22        """Prompt the user to enter a secret.
23
24        As a security measure, the secret is not echoed to the terminal when typed.
25        """
26        with contextlib.suppress(Exception):
27            return SecretString(getpass(f"Enter the value for secret '{secret_name}': "))
28
29        return None

Secret manager that prompts the user to enter a secret.

name = 'prompt'
def get_secret(self, secret_name: str) -> SecretString | None:
18    def get_secret(
19        self,
20        secret_name: str,
21    ) -> SecretString | None:
22        """Prompt the user to enter a secret.
23
24        As a security measure, the secret is not echoed to the terminal when typed.
25        """
26        with contextlib.suppress(Exception):
27            return SecretString(getpass(f"Enter the value for secret '{secret_name}': "))
28
29        return None

Prompt the user to enter a secret.

As a security measure, the secret is not echoed to the terminal when typed.

class CustomSecretManager(airbyte.secrets.SecretManager, abc.ABC):
13class CustomSecretManager(SecretManager, ABC):
14    """Custom secret manager that retrieves secrets from a custom source.
15
16    This class is a convenience class that can be used to create custom secret
17    managers. By default, custom secrets managers are auto-registered during
18    creation.
19    """
20
21    auto_register = True
22    replace_existing = False
23    as_backup = False
24
25    def __init__(self) -> None:
26        """Initialize the custom secret manager."""
27        super().__init__()
28        if self.auto_register:
29            self.register()
30
31    def register(
32        self,
33        *,
34        replace_existing: bool | None = None,
35        as_backup: bool | None = None,
36    ) -> None:
37        """Register the secret manager as global secret source.
38
39        This makes the secret manager available to the `get_secret` function and
40        allows it to be used automatically as a source for secrets.
41
42        If `replace_existing` is `True`, the secret manager will replace all existing
43        secrets sources, including the default secret managers such as environment
44        variables, dotenv files, and Google Colab secrets. If `replace_existing` is
45        None or not provided, the default behavior will be used from the `replace_existing`
46        of the class (`False` unless overridden by the subclass).
47        """
48        if replace_existing is None:
49            replace_existing = self.replace_existing
50
51        if as_backup is None:
52            as_backup = self.as_backup
53
54        if replace_existing:
55            clear_secret_sources()
56
57        register_secret_manager(
58            self,
59            as_backup=as_backup,
60            replace_existing=replace_existing,
61        )

Custom secret manager that retrieves secrets from a custom source.

This class is a convenience class that can be used to create custom secret managers. By default, custom secrets managers are auto-registered during creation.

CustomSecretManager()
25    def __init__(self) -> None:
26        """Initialize the custom secret manager."""
27        super().__init__()
28        if self.auto_register:
29            self.register()

Initialize the custom secret manager.

auto_register = True
replace_existing = False
as_backup = False
def register( self, *, replace_existing: bool | None = None, as_backup: bool | None = None) -> None:
31    def register(
32        self,
33        *,
34        replace_existing: bool | None = None,
35        as_backup: bool | None = None,
36    ) -> None:
37        """Register the secret manager as global secret source.
38
39        This makes the secret manager available to the `get_secret` function and
40        allows it to be used automatically as a source for secrets.
41
42        If `replace_existing` is `True`, the secret manager will replace all existing
43        secrets sources, including the default secret managers such as environment
44        variables, dotenv files, and Google Colab secrets. If `replace_existing` is
45        None or not provided, the default behavior will be used from the `replace_existing`
46        of the class (`False` unless overridden by the subclass).
47        """
48        if replace_existing is None:
49            replace_existing = self.replace_existing
50
51        if as_backup is None:
52            as_backup = self.as_backup
53
54        if replace_existing:
55            clear_secret_sources()
56
57        register_secret_manager(
58            self,
59            as_backup=as_backup,
60            replace_existing=replace_existing,
61        )

Register the secret manager as global secret source.

This makes the secret manager available to the get_secret function and allows it to be used automatically as a source for secrets.

If replace_existing is True, the secret manager will replace all existing secrets sources, including the default secret managers such as environment variables, dotenv files, and Google Colab secrets. If replace_existing is None or not provided, the default behavior will be used from the replace_existing of the class (False unless overridden by the subclass).

Inherited Members
SecretManager
get_secret
class GoogleGSMSecretManager(airbyte.secrets.CustomSecretManager):
 76class GoogleGSMSecretManager(CustomSecretManager):
 77    """Secret manager that retrieves secrets from Google Secrets Manager (GSM).
 78
 79    This class inherits from `CustomSecretManager` and also adds methods
 80    that are specific to this implementation: `fetch_secrets()`,
 81    `fetch_secrets_by_label()` and `fetch_connector_secrets()`.
 82
 83    This secret manager is not enabled by default. To use it, you must provide the project ID and
 84    the credentials for a service account with the necessary permissions to access the secrets.
 85
 86    The `fetch_connector_secret()` method assumes a label name of `connector`
 87    matches the name of the connector (`source-github`, `destination-snowflake`, etc.)
 88    """
 89
 90    name = SecretSourceEnum.GOOGLE_GSM.value
 91    auto_register = False
 92    as_backup = False
 93    replace_existing = False
 94
 95    CONNECTOR_LABEL = "connector"
 96    """The label key used to filter secrets by connector name."""
 97
 98    def __init__(
 99        self,
100        project: str,
101        *,
102        credentials_path: str | None = None,
103        credentials_json: str | SecretString | None = None,
104        auto_register: bool = False,
105        as_backup: bool = False,
106    ) -> None:
107        """Instantiate a new Google GSM secret manager instance.
108
109        You can provide either the path to the credentials file or the JSON contents of the
110        credentials file. If both are provided, a `PyAirbyteInputError` will be raised.
111        """
112        if credentials_path and credentials_json:
113            raise exc.PyAirbyteInputError(
114                guidance=("You can provide `credentials_path` or `credentials_json` but not both."),
115            )
116
117        self.project = project
118
119        if credentials_json is not None and not isinstance(credentials_json, SecretString):
120            credentials_json = SecretString(credentials_json)
121
122        if not credentials_json and not credentials_path:
123            if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ:
124                credentials_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
125
126            elif "GCP_GSM_CREDENTIALS" in os.environ:
127                credentials_json = SecretString(os.environ["GCP_GSM_CREDENTIALS"])
128
129        if credentials_path:
130            credentials_json = SecretString(Path(credentials_path).read_text(encoding="utf-8"))
131
132        if not credentials_json:
133            raise exc.PyAirbyteInputError(
134                guidance=(
135                    "No Google Cloud credentials found. You can provide the path to the "
136                    "credentials file using the `credentials_path` argument, or provide the JSON "
137                    "contents of the credentials file using the `credentials_json` argument."
138                ),
139            )
140
141        self.secret_client = secretmanager.SecretManagerServiceClient.from_service_account_info(
142            json.loads(credentials_json)
143        )
144
145        if auto_register:
146            self.auto_register = auto_register
147
148        if as_backup:
149            self.as_backup = as_backup
150
151        super().__init__()  # Handles the registration if needed
152
153    def _fully_qualified_secret_name(self, secret_name: str) -> str:
154        """Get the fully qualified secret name."""
155        full_name = secret_name
156        if "projects/" not in full_name:
157            # This is not yet fully qualified
158            full_name = f"projects/{self.project}/secrets/{secret_name}/versions/latest"
159
160        if "/versions/" not in full_name:
161            full_name += "/versions/latest"
162
163        return full_name
164
165    def get_secret(self, secret_name: str) -> SecretString | None:
166        """Get a named secret from Google Colab user secrets."""
167        return SecretString(
168            self.secret_client.access_secret_version(
169                name=self._fully_qualified_secret_name(secret_name)
170            ).payload.data.decode("UTF-8")
171        )
172
173    def get_secret_handle(
174        self,
175        secret_name: str,
176    ) -> GSMSecretHandle:
177        """Fetch secret in the secret manager, using the secret name.
178
179        Unlike `get_secret`, this method returns a `GSMSecretHandle` object, which can be used to
180        inspect the secret's labels and other metadata.
181
182        Args:
183            secret_name (str): The name of the connector to filter by.
184
185        Returns:
186            GSMSecretHandle: A handle for the matching secret.
187        """
188        return GSMSecretHandle(
189            parent=self,
190            secret_name=self._fully_qualified_secret_name(secret_name),
191        )
192
193    def fetch_secrets(
194        self,
195        *,
196        filter_string: str,
197    ) -> Iterable[GSMSecretHandle]:
198        """List all available secrets in the secret manager.
199
200        Example filter strings:
201        - `labels.connector=source-bigquery`: Filter for secrets with the labe 'source-bigquery'.
202
203        Args:
204            filter_string (str): A filter string to apply to the list of secrets, following the
205                format described in the Google Secret Manager documentation:
206                https://cloud.google.com/secret-manager/docs/filtering
207
208        Returns:
209            Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching
210            secrets.
211        """
212        gsm_secrets: ListSecretsPager = self.secret_client.list_secrets(
213            request=secretmanager.ListSecretsRequest(
214                filter=filter_string,
215                parent=f"projects/{self.project}",
216            ),
217        )
218
219        return [
220            GSMSecretHandle(
221                parent=self,
222                secret_name=secret.name,
223            )
224            for secret in gsm_secrets
225        ]
226
227    def fetch_secrets_by_label(
228        self,
229        label_key: str,
230        label_value: str,
231    ) -> Iterable[GSMSecretHandle]:
232        """List all available secrets in the secret manager.
233
234        Args:
235            label_key (str): The key of the label to filter by.
236            label_value (str): The value of the label to filter by.
237
238        Returns:
239            Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching
240            secrets.
241        """
242        return self.fetch_secrets(filter_string=f"labels.{label_key}={label_value}")
243
244    def fetch_connector_secrets(
245        self,
246        connector_name: str,
247    ) -> Iterable[GSMSecretHandle]:
248        """Fetch secrets in the secret manager, using the connector name as a filter for the label.
249
250        The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute,
251        which defaults to 'connector'.
252
253        Args:
254            connector_name (str): The name of the connector to filter by.
255
256        Returns:
257            Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching
258            secrets.
259        """
260        return self.fetch_secrets_by_label(
261            label_key=self.CONNECTOR_LABEL,
262            label_value=connector_name,
263        )
264
265    def fetch_connector_secret(
266        self,
267        connector_name: str,
268    ) -> GSMSecretHandle:
269        """Fetch secret in the secret manager, using the connector name as a filter for the label.
270
271        This method is a convenience method that returns the first secret found for the connector.
272
273        The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute,
274        which defaults to 'connector'.
275
276        Args:
277            connector_name (str): The name of the connector to filter by.
278
279        Returns:
280            GSMSecretHandle: A handle for the matching secret.
281        """
282        results: Iterable[GSMSecretHandle] = self.fetch_connector_secrets(connector_name)
283        try:
284            result = next(iter(results))
285        except StopIteration:
286            raise exc.PyAirbyteError(
287                message="No secrets found for connector.",
288                guidance=(
289                    "Please check that the connector name is correct "
290                    "and that the secret is correctly labeled."
291                ),
292                context={
293                    "project": self.project,
294                    "connector_name": connector_name,
295                    "label_key": self.CONNECTOR_LABEL,
296                },
297            ) from None
298
299        return result

Secret manager that retrieves secrets from Google Secrets Manager (GSM).

This class inherits from CustomSecretManager and also adds methods that are specific to this implementation: fetch_secrets(), fetch_secrets_by_label() and fetch_connector_secrets().

This secret manager is not enabled by default. To use it, you must provide the project ID and the credentials for a service account with the necessary permissions to access the secrets.

The fetch_connector_secret() method assumes a label name of connector matches the name of the connector (source-github, destination-snowflake, etc.)

GoogleGSMSecretManager( project: str, *, credentials_path: str | None = None, credentials_json: str | SecretString | None = None, auto_register: bool = False, as_backup: bool = False)
 98    def __init__(
 99        self,
100        project: str,
101        *,
102        credentials_path: str | None = None,
103        credentials_json: str | SecretString | None = None,
104        auto_register: bool = False,
105        as_backup: bool = False,
106    ) -> None:
107        """Instantiate a new Google GSM secret manager instance.
108
109        You can provide either the path to the credentials file or the JSON contents of the
110        credentials file. If both are provided, a `PyAirbyteInputError` will be raised.
111        """
112        if credentials_path and credentials_json:
113            raise exc.PyAirbyteInputError(
114                guidance=("You can provide `credentials_path` or `credentials_json` but not both."),
115            )
116
117        self.project = project
118
119        if credentials_json is not None and not isinstance(credentials_json, SecretString):
120            credentials_json = SecretString(credentials_json)
121
122        if not credentials_json and not credentials_path:
123            if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ:
124                credentials_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"]
125
126            elif "GCP_GSM_CREDENTIALS" in os.environ:
127                credentials_json = SecretString(os.environ["GCP_GSM_CREDENTIALS"])
128
129        if credentials_path:
130            credentials_json = SecretString(Path(credentials_path).read_text(encoding="utf-8"))
131
132        if not credentials_json:
133            raise exc.PyAirbyteInputError(
134                guidance=(
135                    "No Google Cloud credentials found. You can provide the path to the "
136                    "credentials file using the `credentials_path` argument, or provide the JSON "
137                    "contents of the credentials file using the `credentials_json` argument."
138                ),
139            )
140
141        self.secret_client = secretmanager.SecretManagerServiceClient.from_service_account_info(
142            json.loads(credentials_json)
143        )
144
145        if auto_register:
146            self.auto_register = auto_register
147
148        if as_backup:
149            self.as_backup = as_backup
150
151        super().__init__()  # Handles the registration if needed

Instantiate a new Google GSM secret manager instance.

You can provide either the path to the credentials file or the JSON contents of the credentials file. If both are provided, a PyAirbyteInputError will be raised.

name = 'google_gsm'
auto_register = False
as_backup = False
replace_existing = False
CONNECTOR_LABEL = 'connector'

The label key used to filter secrets by connector name.

project
secret_client
def get_secret(self, secret_name: str) -> SecretString | None:
165    def get_secret(self, secret_name: str) -> SecretString | None:
166        """Get a named secret from Google Colab user secrets."""
167        return SecretString(
168            self.secret_client.access_secret_version(
169                name=self._fully_qualified_secret_name(secret_name)
170            ).payload.data.decode("UTF-8")
171        )

Get a named secret from Google Colab user secrets.

def get_secret_handle(self, secret_name: str) -> airbyte.secrets.google_gsm.GSMSecretHandle:
173    def get_secret_handle(
174        self,
175        secret_name: str,
176    ) -> GSMSecretHandle:
177        """Fetch secret in the secret manager, using the secret name.
178
179        Unlike `get_secret`, this method returns a `GSMSecretHandle` object, which can be used to
180        inspect the secret's labels and other metadata.
181
182        Args:
183            secret_name (str): The name of the connector to filter by.
184
185        Returns:
186            GSMSecretHandle: A handle for the matching secret.
187        """
188        return GSMSecretHandle(
189            parent=self,
190            secret_name=self._fully_qualified_secret_name(secret_name),
191        )

Fetch secret in the secret manager, using the secret name.

Unlike get_secret, this method returns a GSMSecretHandle object, which can be used to inspect the secret's labels and other metadata.

Arguments:
  • secret_name (str): The name of the connector to filter by.
Returns:

GSMSecretHandle: A handle for the matching secret.

def fetch_secrets( self, *, filter_string: str) -> Iterable[airbyte.secrets.google_gsm.GSMSecretHandle]:
193    def fetch_secrets(
194        self,
195        *,
196        filter_string: str,
197    ) -> Iterable[GSMSecretHandle]:
198        """List all available secrets in the secret manager.
199
200        Example filter strings:
201        - `labels.connector=source-bigquery`: Filter for secrets with the labe 'source-bigquery'.
202
203        Args:
204            filter_string (str): A filter string to apply to the list of secrets, following the
205                format described in the Google Secret Manager documentation:
206                https://cloud.google.com/secret-manager/docs/filtering
207
208        Returns:
209            Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching
210            secrets.
211        """
212        gsm_secrets: ListSecretsPager = self.secret_client.list_secrets(
213            request=secretmanager.ListSecretsRequest(
214                filter=filter_string,
215                parent=f"projects/{self.project}",
216            ),
217        )
218
219        return [
220            GSMSecretHandle(
221                parent=self,
222                secret_name=secret.name,
223            )
224            for secret in gsm_secrets
225        ]

List all available secrets in the secret manager.

Example filter strings:

  • labels.connector=source-bigquery: Filter for secrets with the labe 'source-bigquery'.
Arguments:
Returns:

Iterable[GSMSecretHandle]: An iterable of GSMSecretHandle objects for the matching secrets.

def fetch_secrets_by_label( self, label_key: str, label_value: str) -> Iterable[airbyte.secrets.google_gsm.GSMSecretHandle]:
227    def fetch_secrets_by_label(
228        self,
229        label_key: str,
230        label_value: str,
231    ) -> Iterable[GSMSecretHandle]:
232        """List all available secrets in the secret manager.
233
234        Args:
235            label_key (str): The key of the label to filter by.
236            label_value (str): The value of the label to filter by.
237
238        Returns:
239            Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching
240            secrets.
241        """
242        return self.fetch_secrets(filter_string=f"labels.{label_key}={label_value}")

List all available secrets in the secret manager.

Arguments:
  • label_key (str): The key of the label to filter by.
  • label_value (str): The value of the label to filter by.
Returns:

Iterable[GSMSecretHandle]: An iterable of GSMSecretHandle objects for the matching secrets.

def fetch_connector_secrets( self, connector_name: str) -> Iterable[airbyte.secrets.google_gsm.GSMSecretHandle]:
244    def fetch_connector_secrets(
245        self,
246        connector_name: str,
247    ) -> Iterable[GSMSecretHandle]:
248        """Fetch secrets in the secret manager, using the connector name as a filter for the label.
249
250        The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute,
251        which defaults to 'connector'.
252
253        Args:
254            connector_name (str): The name of the connector to filter by.
255
256        Returns:
257            Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching
258            secrets.
259        """
260        return self.fetch_secrets_by_label(
261            label_key=self.CONNECTOR_LABEL,
262            label_value=connector_name,
263        )

Fetch secrets in the secret manager, using the connector name as a filter for the label.

The label key used to filter the secrets is defined by the CONNECTOR_LABEL attribute, which defaults to 'connector'.

Arguments:
  • connector_name (str): The name of the connector to filter by.
Returns:

Iterable[GSMSecretHandle]: An iterable of GSMSecretHandle objects for the matching secrets.

def fetch_connector_secret(self, connector_name: str) -> airbyte.secrets.google_gsm.GSMSecretHandle:
265    def fetch_connector_secret(
266        self,
267        connector_name: str,
268    ) -> GSMSecretHandle:
269        """Fetch secret in the secret manager, using the connector name as a filter for the label.
270
271        This method is a convenience method that returns the first secret found for the connector.
272
273        The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute,
274        which defaults to 'connector'.
275
276        Args:
277            connector_name (str): The name of the connector to filter by.
278
279        Returns:
280            GSMSecretHandle: A handle for the matching secret.
281        """
282        results: Iterable[GSMSecretHandle] = self.fetch_connector_secrets(connector_name)
283        try:
284            result = next(iter(results))
285        except StopIteration:
286            raise exc.PyAirbyteError(
287                message="No secrets found for connector.",
288                guidance=(
289                    "Please check that the connector name is correct "
290                    "and that the secret is correctly labeled."
291                ),
292                context={
293                    "project": self.project,
294                    "connector_name": connector_name,
295                    "label_key": self.CONNECTOR_LABEL,
296                },
297            ) from None
298
299        return result

Fetch secret in the secret manager, using the connector name as a filter for the label.

This method is a convenience method that returns the first secret found for the connector.

The label key used to filter the secrets is defined by the CONNECTOR_LABEL attribute, which defaults to 'connector'.

Arguments:
  • connector_name (str): The name of the connector to filter by.
Returns:

GSMSecretHandle: A handle for the matching secret.

Inherited Members
CustomSecretManager
register
def register_secret_manager( secret_manager: CustomSecretManager, *, as_backup: bool = False, replace_existing: bool = False) -> None:
47def register_secret_manager(
48    secret_manager: CustomSecretManager,
49    *,
50    as_backup: bool = False,
51    replace_existing: bool = False,
52) -> None:
53    """Register a custom secret manager."""
54    if replace_existing:
55        clear_secret_sources()
56
57    if as_backup:
58        # Add to end of list
59        _SECRETS_SOURCES.append(secret_manager)
60    else:
61        # Add to beginning of list
62        _SECRETS_SOURCES.insert(0, secret_manager)

Register a custom secret manager.

def disable_secret_source( source: SecretManager | SecretSourceEnum) -> None:
70def disable_secret_source(source: SecretManager | SecretSourceEnum) -> None:
71    """Disable one of the default secrets sources.
72
73    This function can accept either a `SecretManager` instance, a `SecretSourceEnum` enum value, or
74    a string representing the name of the source to disable.
75    """
76    if isinstance(source, SecretManager) and source in _SECRETS_SOURCES:
77        _SECRETS_SOURCES.remove(source)
78        return
79
80    # Else, remove by name
81    for s in list(_SECRETS_SOURCES).copy():
82        if s.name == str(source):
83            _SECRETS_SOURCES.remove(s)

Disable one of the default secrets sources.

This function can accept either a SecretManager instance, a SecretSourceEnum enum value, or a string representing the name of the source to disable.