airbyte.secrets
Secrets management for PyAirbyte.
PyAirbyte provides a secrets management system that allows you to securely store and retrieve sensitive information. This module provides the secrets functionality.
Secrets Management
PyAirbyte can auto-import secrets from the following sources:
- Environment variables.
- Variables defined in a local
.env
("Dotenv") file. - Google Colab secrets.
- Manual entry via
getpass
.
Note: You can also build your own secret manager by subclassing the CustomSecretManager
implementation. For more information, see the airbyte.secrets.CustomSecretManager
reference docs.
Retrieving Secrets
To retrieve a secret, use the get_secret()
function. For example:
import airbyte as ab
source = ab.get_source("source-github")
source.set_config(
"credentials": {
"personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"),
}
)
By default, PyAirbyte will search all available secrets sources. The get_secret()
function also
accepts an optional sources
argument of specific source names (SecretSourceEnum
) and/or secret
manager objects to check.
By default, PyAirbyte will prompt the user for any requested secrets that are not provided via other
secret managers. You can disable this prompt by passing allow_prompt=False
to get_secret()
.
Secrets Auto-Discovery
If you have a secret matching an expected name, PyAirbyte will automatically use it. For example, if
you have a secret named GITHUB_PERSONAL_ACCESS_TOKEN
, PyAirbyte will automatically use it when
configuring the GitHub source.
The naming convention for secrets is as {CONNECTOR_NAME}_{PROPERTY_NAME}
, for instance
SNOWFLAKE_PASSWORD
and BIGQUERY_CREDENTIALS_PATH
.
PyAirbyte will also auto-discover secrets for interop with hosted Airbyte: AIRBYTE_CLOUD_API_URL
,
AIRBYTE_CLOUD_API_KEY
, etc.
Custom Secret Managers
If you need to build your own secret manager, you can subclass the
airbyte.secrets.CustomSecretManager
class. This allows you to build a custom secret manager that
can be used with the get_secret()
function, securely storing and retrieving secrets as needed.
API Reference
_Below are the classes and functions available in the airbyte.secrets
module._
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2"""Secrets management for PyAirbyte. 3 4PyAirbyte provides a secrets management system that allows you to securely store and retrieve 5sensitive information. This module provides the secrets functionality. 6 7## Secrets Management 8 9PyAirbyte can auto-import secrets from the following sources: 10 111. Environment variables. 122. Variables defined in a local `.env` ("Dotenv") file. 133. [Google Colab secrets](https://medium.com/@parthdasawant/how-to-use-secrets-in-google-colab-450c38e3ec75). 144. Manual entry via [`getpass`](https://docs.python.org/3.10/library/getpass.html). 15 16**Note:** You can also build your own secret manager by subclassing the `CustomSecretManager` 17implementation. For more information, see the `airbyte.secrets.CustomSecretManager` reference docs. 18 19### Retrieving Secrets 20 21To retrieve a secret, use the `get_secret()` function. For example: 22 23```python 24import airbyte as ab 25 26source = ab.get_source("source-github") 27source.set_config( 28 "credentials": { 29 "personal_access_token": ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"), 30 } 31) 32``` 33 34By default, PyAirbyte will search all available secrets sources. The `get_secret()` function also 35accepts an optional `sources` argument of specific source names (`SecretSourceEnum`) and/or secret 36manager objects to check. 37 38By default, PyAirbyte will prompt the user for any requested secrets that are not provided via other 39secret managers. You can disable this prompt by passing `allow_prompt=False` to `get_secret()`. 40 41### Secrets Auto-Discovery 42 43If you have a secret matching an expected name, PyAirbyte will automatically use it. For example, if 44you have a secret named `GITHUB_PERSONAL_ACCESS_TOKEN`, PyAirbyte will automatically use it when 45configuring the GitHub source. 46 47The naming convention for secrets is as `{CONNECTOR_NAME}_{PROPERTY_NAME}`, for instance 48`SNOWFLAKE_PASSWORD` and `BIGQUERY_CREDENTIALS_PATH`. 49 50PyAirbyte will also auto-discover secrets for interop with hosted Airbyte: `AIRBYTE_CLOUD_API_URL`, 51`AIRBYTE_CLOUD_API_KEY`, etc. 52 53## Custom Secret Managers 54 55If you need to build your own secret manager, you can subclass the 56`airbyte.secrets.CustomSecretManager` class. This allows you to build a custom secret manager that 57can be used with the `get_secret()` function, securely storing and retrieving secrets as needed. 58 59## API Reference 60 61_Below are the classes and functions available in the `airbyte.secrets` module._ 62 63""" 64 65from __future__ import annotations 66 67from typing import TYPE_CHECKING 68 69from airbyte.secrets.base import SecretHandle, SecretManager, SecretSourceEnum, SecretString 70from airbyte.secrets.config import disable_secret_source, register_secret_manager 71from airbyte.secrets.custom import CustomSecretManager 72from airbyte.secrets.env_vars import DotenvSecretManager, EnvVarSecretManager 73from airbyte.secrets.google_colab import ColabSecretManager 74from airbyte.secrets.google_gsm import GoogleGSMSecretManager 75from airbyte.secrets.prompt import SecretsPrompt 76from airbyte.secrets.util import get_secret 77 78 79# Submodules imported here for documentation reasons: https://github.com/mitmproxy/pdoc/issues/757 80if TYPE_CHECKING: 81 # ruff: noqa: TC004 # imports used for more than type checking 82 from airbyte.secrets import ( 83 base, 84 config, 85 custom, 86 env_vars, 87 google_colab, 88 google_gsm, 89 prompt, 90 util, 91 ) 92 93 94__all__ = [ 95 # Submodules 96 "base", 97 "config", 98 "custom", 99 "env_vars", 100 "google_colab", 101 "google_gsm", 102 "prompt", 103 "util", 104 # Secret Access 105 "get_secret", 106 # Secret Classes 107 "SecretSourceEnum", 108 "SecretString", 109 "SecretHandle", 110 # Secret Managers 111 "SecretManager", 112 "EnvVarSecretManager", 113 "DotenvSecretManager", 114 "ColabSecretManager", 115 "SecretsPrompt", 116 "CustomSecretManager", 117 "GoogleGSMSecretManager", 118 # Registration Functions` 119 "register_secret_manager", 120 "disable_secret_source", 121]
15def get_secret( 16 secret_name: str, 17 /, 18 *, 19 sources: list[SecretManager | SecretSourceEnum] | None = None, 20 allow_prompt: bool = True, 21 **kwargs: dict[str, Any], 22) -> SecretString: 23 """Get a secret from the environment. 24 25 The optional `sources` argument of enum type `SecretSourceEnum` or list of `SecretSourceEnum` 26 options. If left blank, all available sources will be checked. If a list of `SecretSourceEnum` 27 entries is passed, then the sources will be checked using the provided ordering. 28 29 If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then 30 the user will be prompted to enter the secret if it is not found in any of the other sources. 31 """ 32 if "source" in kwargs: 33 warnings.warn( 34 message="The `source` argument is deprecated. Use the `sources` argument instead.", 35 category=DeprecationWarning, 36 stacklevel=2, 37 ) 38 sources = kwargs.pop("source") # type: ignore [assignment] 39 40 available_sources: dict[str, SecretManager] = {} 41 for available_source in _get_secret_sources(): 42 # Add available sources to the dict. Order matters. 43 available_sources[available_source.name] = available_source 44 45 if sources is None: 46 # If ANY is in the list, then we don't need to check any other sources. 47 # This is the default behavior. 48 sources = list(available_sources.values()) 49 50 elif not isinstance(sources, list): 51 sources = [sources] # type: ignore [unreachable] # This is a 'just in case' catch. 52 53 # Replace any SecretSourceEnum strings with the matching SecretManager object 54 for source in list(sources): 55 if isinstance(source, SecretSourceEnum): 56 if source not in available_sources: 57 raise exc.PyAirbyteInputError( 58 guidance="Invalid secret source name.", 59 input_value=source, 60 context={ 61 "Available Sources": list(available_sources.keys()), 62 }, 63 ) 64 65 sources[sources.index(source)] = available_sources[source] 66 67 secret_managers = cast("list[SecretManager]", sources) 68 69 if SecretSourceEnum.PROMPT in secret_managers: 70 prompt_source = secret_managers.pop( 71 # Mis-typed, but okay here since we have equality logic for the enum comparison: 72 secret_managers.index(SecretSourceEnum.PROMPT), # type: ignore [arg-type] 73 ) 74 75 if allow_prompt: 76 # Always check prompt last. Add it to the end of the list. 77 secret_managers.append(prompt_source) 78 79 for secret_mgr in secret_managers: 80 val = secret_mgr.get_secret(secret_name) 81 if val: 82 return SecretString(val) 83 84 raise exc.PyAirbyteSecretNotFoundError( 85 secret_name=secret_name, 86 sources=[str(s) for s in available_sources], 87 )
Get a secret from the environment.
The optional sources
argument of enum type SecretSourceEnum
or list of SecretSourceEnum
options. If left blank, all available sources will be checked. If a list of SecretSourceEnum
entries is passed, then the sources will be checked using the provided ordering.
If allow_prompt
is True
or if SecretSourceEnum.PROMPT is declared in the source
arg, then
the user will be prompted to enter the secret if it is not found in any of the other sources.
24class SecretSourceEnum(str, Enum): 25 """Enumeration of secret sources supported by PyAirbyte.""" 26 27 ENV = "env" 28 DOTENV = "dotenv" 29 GOOGLE_COLAB = "google_colab" 30 GOOGLE_GSM = "google_gsm" # Not enabled by default 31 32 PROMPT = "prompt"
Enumeration of secret sources supported by PyAirbyte.
Inherited Members
- enum.Enum
- name
- value
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
35class SecretString(str): # noqa: FURB189 # Allow subclass from str instead of UserStr 36 """A string that represents a secret. 37 38 This class is used to mark a string as a secret. When a secret is printed, it 39 will be masked to prevent accidental exposure of sensitive information when debugging 40 or when printing containing objects like dictionaries. 41 42 To create a secret string, simply instantiate the class with any string value: 43 44 ```python 45 secret = SecretString("my_secret_password") 46 ``` 47 48 """ 49 50 __slots__ = () 51 52 def __repr__(self) -> str: 53 """Override the representation of the secret string to return a masked value. 54 55 The secret string is always masked with `****` to prevent accidental exposure, unless 56 explicitly converted to a string. For instance, printing a config dictionary that contains 57 a secret will automatically mask the secret value instead of printing it in plain text. 58 59 However, if you explicitly convert the cast the secret as a string, such as when used 60 in an f-string, the secret will be exposed. This is the desired behavior to allow 61 secrets to be used in a controlled manner. 62 """ 63 return "<SecretString: ****>" 64 65 def is_empty(self) -> bool: 66 """Check if the secret is an empty string.""" 67 return len(self) == 0 68 69 def is_json(self) -> bool: 70 """Check if the secret string is a valid JSON string.""" 71 try: 72 json.loads(self) 73 except (json.JSONDecodeError, Exception): 74 return False 75 76 return True 77 78 def __bool__(self) -> bool: 79 """Override the boolean value of the secret string. 80 81 Always returns `True` without inspecting contents. 82 """ 83 return True 84 85 def parse_json(self) -> dict: 86 """Parse the secret string as JSON.""" 87 try: 88 return json.loads(self) 89 except json.JSONDecodeError as ex: 90 raise exc.PyAirbyteInputError( 91 message="Failed to parse secret as JSON.", 92 context={ 93 "Message": ex.msg, 94 "Position": ex.pos, 95 "SecretString_Length": len(self), # Debug secret blank or an unexpected format. 96 }, 97 ) from None 98 99 # Pydantic compatibility 100 101 @classmethod 102 def validate( 103 cls, 104 v: Any, # noqa: ANN401 # Must allow `Any` to match Pydantic signature 105 info: ValidationInfo, 106 ) -> SecretString: 107 """Validate the input value is valid as a secret string.""" 108 _ = info # Unused 109 if not isinstance(v, str): 110 raise exc.PyAirbyteInputError( 111 message="A valid `str` or `SecretString` object is required.", 112 ) 113 return cls(v) 114 115 @classmethod 116 def __get_pydantic_core_schema__( # noqa: PLW3201 # Pydantic dunder 117 cls, 118 source_type: Any, # noqa: ANN401 # Must allow `Any` to match Pydantic signature 119 handler: GetCoreSchemaHandler, 120 ) -> CoreSchema: 121 """Return a modified core schema for the secret string.""" 122 return core_schema.with_info_after_validator_function( 123 function=cls.validate, schema=handler(str), field_name=handler.field_name 124 ) 125 126 @classmethod 127 def __get_pydantic_json_schema__( # noqa: PLW3201 # Pydantic dunder method 128 cls, core_schema_: core_schema.CoreSchema, handler: GetJsonSchemaHandler 129 ) -> JsonSchemaValue: 130 """Return a modified JSON schema for the secret string. 131 132 - `writeOnly=True` is the official way to prevent secrets from being exposed inadvertently. 133 - `Format=password` is a popular and readable convention to indicate the field is sensitive. 134 """ 135 _ = core_schema_, handler # Unused 136 return { 137 "type": "string", 138 "format": "password", 139 "writeOnly": True, 140 }
A string that represents a secret.
This class is used to mark a string as a secret. When a secret is printed, it will be masked to prevent accidental exposure of sensitive information when debugging or when printing containing objects like dictionaries.
To create a secret string, simply instantiate the class with any string value:
secret = SecretString("my_secret_password")
65 def is_empty(self) -> bool: 66 """Check if the secret is an empty string.""" 67 return len(self) == 0
Check if the secret is an empty string.
69 def is_json(self) -> bool: 70 """Check if the secret string is a valid JSON string.""" 71 try: 72 json.loads(self) 73 except (json.JSONDecodeError, Exception): 74 return False 75 76 return True
Check if the secret string is a valid JSON string.
85 def parse_json(self) -> dict: 86 """Parse the secret string as JSON.""" 87 try: 88 return json.loads(self) 89 except json.JSONDecodeError as ex: 90 raise exc.PyAirbyteInputError( 91 message="Failed to parse secret as JSON.", 92 context={ 93 "Message": ex.msg, 94 "Position": ex.pos, 95 "SecretString_Length": len(self), # Debug secret blank or an unexpected format. 96 }, 97 ) from None
Parse the secret string as JSON.
101 @classmethod 102 def validate( 103 cls, 104 v: Any, # noqa: ANN401 # Must allow `Any` to match Pydantic signature 105 info: ValidationInfo, 106 ) -> SecretString: 107 """Validate the input value is valid as a secret string.""" 108 _ = info # Unused 109 if not isinstance(v, str): 110 raise exc.PyAirbyteInputError( 111 message="A valid `str` or `SecretString` object is required.", 112 ) 113 return cls(v)
Validate the input value is valid as a secret string.
Inherited Members
- builtins.str
- encode
- replace
- split
- rsplit
- join
- capitalize
- casefold
- title
- center
- count
- expandtabs
- find
- partition
- index
- ljust
- lower
- lstrip
- rfind
- rindex
- rjust
- rstrip
- rpartition
- splitlines
- strip
- swapcase
- translate
- upper
- startswith
- endswith
- removeprefix
- removesuffix
- isascii
- islower
- isupper
- istitle
- isspace
- isdecimal
- isdigit
- isnumeric
- isalpha
- isalnum
- isidentifier
- isprintable
- zfill
- format
- format_map
- maketrans
208class SecretHandle: 209 """A handle for a secret in a secret manager. 210 211 This class is used to store a reference to a secret in a secret manager. 212 The secret is not retrieved until the `get_value()` or `parse_json()` methods are 213 called. 214 """ 215 216 def __init__( 217 self, 218 parent: SecretManager, 219 secret_name: str, 220 ) -> None: 221 """Instantiate a new secret handle.""" 222 self.parent = parent 223 self.secret_name = secret_name 224 225 def get_value(self) -> SecretString: 226 """Get the secret from the secret manager. 227 228 Subclasses can optionally override this method to provide a more optimized code path. 229 """ 230 return cast("SecretString", self.parent.get_secret(self.secret_name)) 231 232 def parse_json(self) -> dict: 233 """Parse the secret as JSON. 234 235 This method is a convenience method to parse the secret as JSON without 236 needing to call `get_value()` first. If the secret is not a valid JSON 237 string, a `PyAirbyteInputError` will be raised. 238 """ 239 return self.get_value().parse_json() 240 241 def write_to_file( 242 self, 243 file_path: Path, 244 /, 245 *, 246 silent: bool = False, 247 ) -> None: 248 """Write the secret to a file. 249 250 If `silent` is True, the method will not print any output to the console. Otherwise, 251 the method will print a message to the console indicating the file path to which the secret 252 is being written. 253 254 This method is a convenience method that writes the secret to a file as text. 255 """ 256 if not silent: 257 print( 258 f"Writing secret '{self.secret_name.split('/')[-1]}' to '{file_path.absolute()!s}'" 259 ) 260 261 file_path.write_text( 262 str(self.get_value()), 263 encoding="utf-8", 264 )
A handle for a secret in a secret manager.
This class is used to store a reference to a secret in a secret manager.
The secret is not retrieved until the get_value()
or parse_json()
methods are
called.
216 def __init__( 217 self, 218 parent: SecretManager, 219 secret_name: str, 220 ) -> None: 221 """Instantiate a new secret handle.""" 222 self.parent = parent 223 self.secret_name = secret_name
Instantiate a new secret handle.
225 def get_value(self) -> SecretString: 226 """Get the secret from the secret manager. 227 228 Subclasses can optionally override this method to provide a more optimized code path. 229 """ 230 return cast("SecretString", self.parent.get_secret(self.secret_name))
Get the secret from the secret manager.
Subclasses can optionally override this method to provide a more optimized code path.
232 def parse_json(self) -> dict: 233 """Parse the secret as JSON. 234 235 This method is a convenience method to parse the secret as JSON without 236 needing to call `get_value()` first. If the secret is not a valid JSON 237 string, a `PyAirbyteInputError` will be raised. 238 """ 239 return self.get_value().parse_json()
Parse the secret as JSON.
This method is a convenience method to parse the secret as JSON without
needing to call get_value()
first. If the secret is not a valid JSON
string, a PyAirbyteInputError
will be raised.
241 def write_to_file( 242 self, 243 file_path: Path, 244 /, 245 *, 246 silent: bool = False, 247 ) -> None: 248 """Write the secret to a file. 249 250 If `silent` is True, the method will not print any output to the console. Otherwise, 251 the method will print a message to the console indicating the file path to which the secret 252 is being written. 253 254 This method is a convenience method that writes the secret to a file as text. 255 """ 256 if not silent: 257 print( 258 f"Writing secret '{self.secret_name.split('/')[-1]}' to '{file_path.absolute()!s}'" 259 ) 260 261 file_path.write_text( 262 str(self.get_value()), 263 encoding="utf-8", 264 )
Write the secret to a file.
If silent
is True, the method will not print any output to the console. Otherwise,
the method will print a message to the console indicating the file path to which the secret
is being written.
This method is a convenience method that writes the secret to a file as text.
143class SecretManager(ABC): 144 """Abstract base class for secret managers. 145 146 Secret managers are used to retrieve secrets from a secret store. 147 148 By registering a secret manager, PyAirbyte can automatically locate and 149 retrieve secrets from the secret store when needed. This allows you to 150 securely store and access sensitive information such as API keys, passwords, 151 and other credentials without hardcoding them in your code. 152 153 To create a custom secret manager, subclass this class and implement the 154 `get_secret` method. By default, the secret manager will be automatically 155 registered as a global secret source, but will not replace any existing 156 secret sources. To customize this behavior, override the `auto_register` and 157 `replace_existing` attributes in your subclass as needed. 158 159 Note: Registered secrets managers always have priority over the default 160 secret sources such as environment variables, dotenv files, and Google Colab 161 secrets. If multiple secret managers are registered, the last one registered 162 will take priority. 163 """ 164 165 replace_existing = False 166 as_backup = False 167 168 def __init__(self) -> None: 169 """Instantiate the new secret manager.""" 170 if not hasattr(self, "name"): 171 # Default to the class name if no name is provided 172 self.name: str = self.__class__.__name__ 173 174 @abstractmethod 175 def get_secret(self, secret_name: str) -> SecretString | None: 176 """Get a named secret from the secret manager. 177 178 This method should be implemented by subclasses to retrieve secrets from 179 the secret store. If the secret is not found, the method should return `None`. 180 """ 181 ... 182 183 def __str__(self) -> str: 184 """Return the name of the secret manager.""" 185 return self.name 186 187 def __eq__(self, value: object) -> bool: 188 """Check if the secret manager is equal to another secret manager.""" 189 if isinstance(value, SecretManager): 190 return self.name == value.name 191 192 if isinstance(value, str): 193 return self.name == value 194 195 if isinstance(value, SecretSourceEnum): 196 return self.name == str(value) 197 198 return super().__eq__(value) 199 200 def __hash__(self) -> int: 201 """Return a hash of the secret manager name. 202 203 This allows the secret manager to be used in sets and dictionaries. 204 """ 205 return hash(self.name)
Abstract base class for secret managers.
Secret managers are used to retrieve secrets from a secret store.
By registering a secret manager, PyAirbyte can automatically locate and retrieve secrets from the secret store when needed. This allows you to securely store and access sensitive information such as API keys, passwords, and other credentials without hardcoding them in your code.
To create a custom secret manager, subclass this class and implement the
get_secret
method. By default, the secret manager will be automatically
registered as a global secret source, but will not replace any existing
secret sources. To customize this behavior, override the auto_register
and
replace_existing
attributes in your subclass as needed.
Note: Registered secrets managers always have priority over the default secret sources such as environment variables, dotenv files, and Google Colab secrets. If multiple secret managers are registered, the last one registered will take priority.
168 def __init__(self) -> None: 169 """Instantiate the new secret manager.""" 170 if not hasattr(self, "name"): 171 # Default to the class name if no name is provided 172 self.name: str = self.__class__.__name__
Instantiate the new secret manager.
174 @abstractmethod 175 def get_secret(self, secret_name: str) -> SecretString | None: 176 """Get a named secret from the secret manager. 177 178 This method should be implemented by subclasses to retrieve secrets from 179 the secret store. If the secret is not found, the method should return `None`. 180 """ 181 ...
Get a named secret from the secret manager.
This method should be implemented by subclasses to retrieve secrets from
the secret store. If the secret is not found, the method should return None
.
14class EnvVarSecretManager(SecretManager): 15 """Secret manager that retrieves secrets from environment variables.""" 16 17 name = SecretSourceEnum.ENV.value 18 19 def get_secret(self, secret_name: str) -> SecretString | None: 20 """Get a named secret from the environment.""" 21 if secret_name not in os.environ: 22 return None 23 24 return SecretString(os.environ[secret_name])
Secret manager that retrieves secrets from environment variables.
19 def get_secret(self, secret_name: str) -> SecretString | None: 20 """Get a named secret from the environment.""" 21 if secret_name not in os.environ: 22 return None 23 24 return SecretString(os.environ[secret_name])
Get a named secret from the environment.
Inherited Members
27class DotenvSecretManager(SecretManager): 28 """Secret manager that retrieves secrets from a `.env` file.""" 29 30 name = SecretSourceEnum.DOTENV.value 31 32 def get_secret(self, secret_name: str) -> SecretString | None: 33 """Get a named secret from the `.env` file.""" 34 try: 35 dotenv_vars: dict[str, str | None] = dotenv_values() 36 except Exception: 37 # Can't locate or parse a .env file 38 return None 39 40 if secret_name not in dotenv_vars: 41 # Secret not found 42 return None 43 44 return SecretString(dotenv_vars[secret_name])
Secret manager that retrieves secrets from a .env
file.
32 def get_secret(self, secret_name: str) -> SecretString | None: 33 """Get a named secret from the `.env` file.""" 34 try: 35 dotenv_vars: dict[str, str | None] = dotenv_values() 36 except Exception: 37 # Can't locate or parse a .env file 38 return None 39 40 if secret_name not in dotenv_vars: 41 # Secret not found 42 return None 43 44 return SecretString(dotenv_vars[secret_name])
Get a named secret from the .env
file.
Inherited Members
10class ColabSecretManager(SecretManager): 11 """Secret manager that retrieves secrets from Google Colab user secrets.""" 12 13 name = SecretSourceEnum.GOOGLE_COLAB.value 14 15 def __init__(self) -> None: 16 """Initialize the Google Colab secret manager.""" 17 try: 18 from google.colab import ( # pyright: ignore[reportMissingImports] # noqa: PLC0415 19 userdata as colab_userdata, 20 ) 21 22 self.colab_userdata = colab_userdata 23 except ImportError: 24 self.colab_userdata = None 25 26 super().__init__() 27 28 def get_secret(self, secret_name: str) -> SecretString | None: 29 """Get a named secret from Google Colab user secrets.""" 30 if self.colab_userdata is None: 31 # The module doesn't exist. We probably aren't in Colab. 32 return None 33 34 try: 35 return SecretString(self.colab_userdata.get(secret_name)) 36 except Exception: 37 # Secret name not found. Continue. 38 return None
Secret manager that retrieves secrets from Google Colab user secrets.
15 def __init__(self) -> None: 16 """Initialize the Google Colab secret manager.""" 17 try: 18 from google.colab import ( # pyright: ignore[reportMissingImports] # noqa: PLC0415 19 userdata as colab_userdata, 20 ) 21 22 self.colab_userdata = colab_userdata 23 except ImportError: 24 self.colab_userdata = None 25 26 super().__init__()
Initialize the Google Colab secret manager.
28 def get_secret(self, secret_name: str) -> SecretString | None: 29 """Get a named secret from Google Colab user secrets.""" 30 if self.colab_userdata is None: 31 # The module doesn't exist. We probably aren't in Colab. 32 return None 33 34 try: 35 return SecretString(self.colab_userdata.get(secret_name)) 36 except Exception: 37 # Secret name not found. Continue. 38 return None
Get a named secret from Google Colab user secrets.
Inherited Members
13class SecretsPrompt(SecretManager): 14 """Secret manager that prompts the user to enter a secret.""" 15 16 name = SecretSourceEnum.PROMPT.value 17 18 def get_secret( 19 self, 20 secret_name: str, 21 ) -> SecretString | None: 22 """Prompt the user to enter a secret. 23 24 As a security measure, the secret is not echoed to the terminal when typed. 25 """ 26 with contextlib.suppress(Exception): 27 return SecretString(getpass(f"Enter the value for secret '{secret_name}': ")) 28 29 return None
Secret manager that prompts the user to enter a secret.
18 def get_secret( 19 self, 20 secret_name: str, 21 ) -> SecretString | None: 22 """Prompt the user to enter a secret. 23 24 As a security measure, the secret is not echoed to the terminal when typed. 25 """ 26 with contextlib.suppress(Exception): 27 return SecretString(getpass(f"Enter the value for secret '{secret_name}': ")) 28 29 return None
Prompt the user to enter a secret.
As a security measure, the secret is not echoed to the terminal when typed.
Inherited Members
13class CustomSecretManager(SecretManager, ABC): 14 """Custom secret manager that retrieves secrets from a custom source. 15 16 This class is a convenience class that can be used to create custom secret 17 managers. By default, custom secrets managers are auto-registered during 18 creation. 19 """ 20 21 auto_register = True 22 replace_existing = False 23 as_backup = False 24 25 def __init__(self) -> None: 26 """Initialize the custom secret manager.""" 27 super().__init__() 28 if self.auto_register: 29 self.register() 30 31 def register( 32 self, 33 *, 34 replace_existing: bool | None = None, 35 as_backup: bool | None = None, 36 ) -> None: 37 """Register the secret manager as global secret source. 38 39 This makes the secret manager available to the `get_secret` function and 40 allows it to be used automatically as a source for secrets. 41 42 If `replace_existing` is `True`, the secret manager will replace all existing 43 secrets sources, including the default secret managers such as environment 44 variables, dotenv files, and Google Colab secrets. If `replace_existing` is 45 None or not provided, the default behavior will be used from the `replace_existing` 46 of the class (`False` unless overridden by the subclass). 47 """ 48 if replace_existing is None: 49 replace_existing = self.replace_existing 50 51 if as_backup is None: 52 as_backup = self.as_backup 53 54 if replace_existing: 55 clear_secret_sources() 56 57 register_secret_manager( 58 self, 59 as_backup=as_backup, 60 replace_existing=replace_existing, 61 )
Custom secret manager that retrieves secrets from a custom source.
This class is a convenience class that can be used to create custom secret managers. By default, custom secrets managers are auto-registered during creation.
25 def __init__(self) -> None: 26 """Initialize the custom secret manager.""" 27 super().__init__() 28 if self.auto_register: 29 self.register()
Initialize the custom secret manager.
31 def register( 32 self, 33 *, 34 replace_existing: bool | None = None, 35 as_backup: bool | None = None, 36 ) -> None: 37 """Register the secret manager as global secret source. 38 39 This makes the secret manager available to the `get_secret` function and 40 allows it to be used automatically as a source for secrets. 41 42 If `replace_existing` is `True`, the secret manager will replace all existing 43 secrets sources, including the default secret managers such as environment 44 variables, dotenv files, and Google Colab secrets. If `replace_existing` is 45 None or not provided, the default behavior will be used from the `replace_existing` 46 of the class (`False` unless overridden by the subclass). 47 """ 48 if replace_existing is None: 49 replace_existing = self.replace_existing 50 51 if as_backup is None: 52 as_backup = self.as_backup 53 54 if replace_existing: 55 clear_secret_sources() 56 57 register_secret_manager( 58 self, 59 as_backup=as_backup, 60 replace_existing=replace_existing, 61 )
Register the secret manager as global secret source.
This makes the secret manager available to the get_secret
function and
allows it to be used automatically as a source for secrets.
If replace_existing
is True
, the secret manager will replace all existing
secrets sources, including the default secret managers such as environment
variables, dotenv files, and Google Colab secrets. If replace_existing
is
None or not provided, the default behavior will be used from the replace_existing
of the class (False
unless overridden by the subclass).
Inherited Members
76class GoogleGSMSecretManager(CustomSecretManager): 77 """Secret manager that retrieves secrets from Google Secrets Manager (GSM). 78 79 This class inherits from `CustomSecretManager` and also adds methods 80 that are specific to this implementation: `fetch_secrets()`, 81 `fetch_secrets_by_label()` and `fetch_connector_secrets()`. 82 83 This secret manager is not enabled by default. To use it, you must provide the project ID and 84 the credentials for a service account with the necessary permissions to access the secrets. 85 86 The `fetch_connector_secret()` method assumes a label name of `connector` 87 matches the name of the connector (`source-github`, `destination-snowflake`, etc.) 88 """ 89 90 name = SecretSourceEnum.GOOGLE_GSM.value 91 auto_register = False 92 as_backup = False 93 replace_existing = False 94 95 CONNECTOR_LABEL = "connector" 96 """The label key used to filter secrets by connector name.""" 97 98 def __init__( 99 self, 100 project: str, 101 *, 102 credentials_path: str | None = None, 103 credentials_json: str | SecretString | None = None, 104 auto_register: bool = False, 105 as_backup: bool = False, 106 ) -> None: 107 """Instantiate a new Google GSM secret manager instance. 108 109 You can provide either the path to the credentials file or the JSON contents of the 110 credentials file. If both are provided, a `PyAirbyteInputError` will be raised. 111 """ 112 if credentials_path and credentials_json: 113 raise exc.PyAirbyteInputError( 114 guidance=("You can provide `credentials_path` or `credentials_json` but not both."), 115 ) 116 117 self.project = project 118 119 if credentials_json is not None and not isinstance(credentials_json, SecretString): 120 credentials_json = SecretString(credentials_json) 121 122 if not credentials_json and not credentials_path: 123 if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ: 124 credentials_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] 125 126 elif "GCP_GSM_CREDENTIALS" in os.environ: 127 credentials_json = SecretString(os.environ["GCP_GSM_CREDENTIALS"]) 128 129 if credentials_path: 130 credentials_json = SecretString(Path(credentials_path).read_text(encoding="utf-8")) 131 132 if not credentials_json: 133 raise exc.PyAirbyteInputError( 134 guidance=( 135 "No Google Cloud credentials found. You can provide the path to the " 136 "credentials file using the `credentials_path` argument, or provide the JSON " 137 "contents of the credentials file using the `credentials_json` argument." 138 ), 139 ) 140 141 self.secret_client = secretmanager.SecretManagerServiceClient.from_service_account_info( 142 json.loads(credentials_json) 143 ) 144 145 if auto_register: 146 self.auto_register = auto_register 147 148 if as_backup: 149 self.as_backup = as_backup 150 151 super().__init__() # Handles the registration if needed 152 153 def _fully_qualified_secret_name(self, secret_name: str) -> str: 154 """Get the fully qualified secret name.""" 155 full_name = secret_name 156 if "projects/" not in full_name: 157 # This is not yet fully qualified 158 full_name = f"projects/{self.project}/secrets/{secret_name}/versions/latest" 159 160 if "/versions/" not in full_name: 161 full_name += "/versions/latest" 162 163 return full_name 164 165 def get_secret(self, secret_name: str) -> SecretString: 166 """Get a named secret from Google Colab user secrets.""" 167 return SecretString( 168 self.secret_client.access_secret_version( 169 name=self._fully_qualified_secret_name(secret_name) 170 ).payload.data.decode("UTF-8") 171 ) 172 173 def get_secret_handle( 174 self, 175 secret_name: str, 176 ) -> GSMSecretHandle: 177 """Fetch secret in the secret manager, using the secret name. 178 179 Unlike `get_secret`, this method returns a `GSMSecretHandle` object, which can be used to 180 inspect the secret's labels and other metadata. 181 182 Args: 183 secret_name (str): The name of the connector to filter by. 184 185 Returns: 186 GSMSecretHandle: A handle for the matching secret. 187 """ 188 return GSMSecretHandle( 189 parent=self, 190 secret_name=self._fully_qualified_secret_name(secret_name), 191 ) 192 193 def fetch_secrets( 194 self, 195 *, 196 filter_string: str, 197 ) -> Iterable[GSMSecretHandle]: 198 """List all available secrets in the secret manager. 199 200 Example filter strings: 201 - `labels.connector=source-bigquery`: Filter for secrets with the labe 'source-bigquery'. 202 203 Args: 204 filter_string (str): A filter string to apply to the list of secrets, following the 205 format described in the Google Secret Manager documentation: 206 https://cloud.google.com/secret-manager/docs/filtering 207 208 Returns: 209 Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching 210 secrets. 211 """ 212 gsm_secrets: ListSecretsPager = self.secret_client.list_secrets( 213 request=secretmanager.ListSecretsRequest( 214 filter=filter_string, 215 parent=f"projects/{self.project}", 216 ), 217 ) 218 219 return [ 220 GSMSecretHandle( 221 parent=self, 222 secret_name=secret.name, 223 ) 224 for secret in gsm_secrets 225 ] 226 227 def fetch_secrets_by_label( 228 self, 229 label_key: str, 230 label_value: str, 231 ) -> Iterable[GSMSecretHandle]: 232 """List all available secrets in the secret manager. 233 234 Args: 235 label_key (str): The key of the label to filter by. 236 label_value (str): The value of the label to filter by. 237 238 Returns: 239 Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching 240 secrets. 241 """ 242 return self.fetch_secrets(filter_string=f"labels.{label_key}={label_value}") 243 244 def fetch_connector_secrets( 245 self, 246 connector_name: str, 247 ) -> Iterable[GSMSecretHandle]: 248 """Fetch secrets in the secret manager, using the connector name as a filter for the label. 249 250 The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute, 251 which defaults to 'connector'. 252 253 Args: 254 connector_name (str): The name of the connector to filter by. 255 256 Returns: 257 Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching 258 secrets. 259 """ 260 return self.fetch_secrets_by_label( 261 label_key=self.CONNECTOR_LABEL, 262 label_value=connector_name, 263 ) 264 265 def fetch_connector_secret( 266 self, 267 connector_name: str, 268 ) -> GSMSecretHandle: 269 """Fetch secret in the secret manager, using the connector name as a filter for the label. 270 271 This method is a convenience method that returns the first secret found for the connector. 272 273 The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute, 274 which defaults to 'connector'. 275 276 Args: 277 connector_name (str): The name of the connector to filter by. 278 279 Returns: 280 GSMSecretHandle: A handle for the matching secret. 281 """ 282 results: Iterable[GSMSecretHandle] = self.fetch_connector_secrets(connector_name) 283 try: 284 result = next(iter(results)) 285 except StopIteration: 286 raise exc.PyAirbyteError( 287 message="No secrets found for connector.", 288 guidance=( 289 "Please check that the connector name is correct " 290 "and that the secret is correctly labeled." 291 ), 292 context={ 293 "project": self.project, 294 "connector_name": connector_name, 295 "label_key": self.CONNECTOR_LABEL, 296 }, 297 ) from None 298 299 return result
Secret manager that retrieves secrets from Google Secrets Manager (GSM).
This class inherits from CustomSecretManager
and also adds methods
that are specific to this implementation: fetch_secrets()
,
fetch_secrets_by_label()
and fetch_connector_secrets()
.
This secret manager is not enabled by default. To use it, you must provide the project ID and the credentials for a service account with the necessary permissions to access the secrets.
The fetch_connector_secret()
method assumes a label name of connector
matches the name of the connector (source-github
, destination-snowflake
, etc.)
98 def __init__( 99 self, 100 project: str, 101 *, 102 credentials_path: str | None = None, 103 credentials_json: str | SecretString | None = None, 104 auto_register: bool = False, 105 as_backup: bool = False, 106 ) -> None: 107 """Instantiate a new Google GSM secret manager instance. 108 109 You can provide either the path to the credentials file or the JSON contents of the 110 credentials file. If both are provided, a `PyAirbyteInputError` will be raised. 111 """ 112 if credentials_path and credentials_json: 113 raise exc.PyAirbyteInputError( 114 guidance=("You can provide `credentials_path` or `credentials_json` but not both."), 115 ) 116 117 self.project = project 118 119 if credentials_json is not None and not isinstance(credentials_json, SecretString): 120 credentials_json = SecretString(credentials_json) 121 122 if not credentials_json and not credentials_path: 123 if "GOOGLE_APPLICATION_CREDENTIALS" in os.environ: 124 credentials_path = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] 125 126 elif "GCP_GSM_CREDENTIALS" in os.environ: 127 credentials_json = SecretString(os.environ["GCP_GSM_CREDENTIALS"]) 128 129 if credentials_path: 130 credentials_json = SecretString(Path(credentials_path).read_text(encoding="utf-8")) 131 132 if not credentials_json: 133 raise exc.PyAirbyteInputError( 134 guidance=( 135 "No Google Cloud credentials found. You can provide the path to the " 136 "credentials file using the `credentials_path` argument, or provide the JSON " 137 "contents of the credentials file using the `credentials_json` argument." 138 ), 139 ) 140 141 self.secret_client = secretmanager.SecretManagerServiceClient.from_service_account_info( 142 json.loads(credentials_json) 143 ) 144 145 if auto_register: 146 self.auto_register = auto_register 147 148 if as_backup: 149 self.as_backup = as_backup 150 151 super().__init__() # Handles the registration if needed
Instantiate a new Google GSM secret manager instance.
You can provide either the path to the credentials file or the JSON contents of the
credentials file. If both are provided, a PyAirbyteInputError
will be raised.
165 def get_secret(self, secret_name: str) -> SecretString: 166 """Get a named secret from Google Colab user secrets.""" 167 return SecretString( 168 self.secret_client.access_secret_version( 169 name=self._fully_qualified_secret_name(secret_name) 170 ).payload.data.decode("UTF-8") 171 )
Get a named secret from Google Colab user secrets.
173 def get_secret_handle( 174 self, 175 secret_name: str, 176 ) -> GSMSecretHandle: 177 """Fetch secret in the secret manager, using the secret name. 178 179 Unlike `get_secret`, this method returns a `GSMSecretHandle` object, which can be used to 180 inspect the secret's labels and other metadata. 181 182 Args: 183 secret_name (str): The name of the connector to filter by. 184 185 Returns: 186 GSMSecretHandle: A handle for the matching secret. 187 """ 188 return GSMSecretHandle( 189 parent=self, 190 secret_name=self._fully_qualified_secret_name(secret_name), 191 )
Fetch secret in the secret manager, using the secret name.
Unlike get_secret
, this method returns a GSMSecretHandle
object, which can be used to
inspect the secret's labels and other metadata.
Arguments:
- secret_name (str): The name of the connector to filter by.
Returns:
GSMSecretHandle: A handle for the matching secret.
193 def fetch_secrets( 194 self, 195 *, 196 filter_string: str, 197 ) -> Iterable[GSMSecretHandle]: 198 """List all available secrets in the secret manager. 199 200 Example filter strings: 201 - `labels.connector=source-bigquery`: Filter for secrets with the labe 'source-bigquery'. 202 203 Args: 204 filter_string (str): A filter string to apply to the list of secrets, following the 205 format described in the Google Secret Manager documentation: 206 https://cloud.google.com/secret-manager/docs/filtering 207 208 Returns: 209 Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching 210 secrets. 211 """ 212 gsm_secrets: ListSecretsPager = self.secret_client.list_secrets( 213 request=secretmanager.ListSecretsRequest( 214 filter=filter_string, 215 parent=f"projects/{self.project}", 216 ), 217 ) 218 219 return [ 220 GSMSecretHandle( 221 parent=self, 222 secret_name=secret.name, 223 ) 224 for secret in gsm_secrets 225 ]
List all available secrets in the secret manager.
Example filter strings:
labels.connector=source-bigquery
: Filter for secrets with the labe 'source-bigquery'.
Arguments:
- filter_string (str): A filter string to apply to the list of secrets, following the format described in the Google Secret Manager documentation: https://cloud.google.com/secret-manager/docs/filtering
Returns:
Iterable[GSMSecretHandle]: An iterable of
GSMSecretHandle
objects for the matching secrets.
227 def fetch_secrets_by_label( 228 self, 229 label_key: str, 230 label_value: str, 231 ) -> Iterable[GSMSecretHandle]: 232 """List all available secrets in the secret manager. 233 234 Args: 235 label_key (str): The key of the label to filter by. 236 label_value (str): The value of the label to filter by. 237 238 Returns: 239 Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching 240 secrets. 241 """ 242 return self.fetch_secrets(filter_string=f"labels.{label_key}={label_value}")
List all available secrets in the secret manager.
Arguments:
- label_key (str): The key of the label to filter by.
- label_value (str): The value of the label to filter by.
Returns:
Iterable[GSMSecretHandle]: An iterable of
GSMSecretHandle
objects for the matching secrets.
244 def fetch_connector_secrets( 245 self, 246 connector_name: str, 247 ) -> Iterable[GSMSecretHandle]: 248 """Fetch secrets in the secret manager, using the connector name as a filter for the label. 249 250 The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute, 251 which defaults to 'connector'. 252 253 Args: 254 connector_name (str): The name of the connector to filter by. 255 256 Returns: 257 Iterable[GSMSecretHandle]: An iterable of `GSMSecretHandle` objects for the matching 258 secrets. 259 """ 260 return self.fetch_secrets_by_label( 261 label_key=self.CONNECTOR_LABEL, 262 label_value=connector_name, 263 )
Fetch secrets in the secret manager, using the connector name as a filter for the label.
The label key used to filter the secrets is defined by the CONNECTOR_LABEL
attribute,
which defaults to 'connector'.
Arguments:
- connector_name (str): The name of the connector to filter by.
Returns:
Iterable[GSMSecretHandle]: An iterable of
GSMSecretHandle
objects for the matching secrets.
265 def fetch_connector_secret( 266 self, 267 connector_name: str, 268 ) -> GSMSecretHandle: 269 """Fetch secret in the secret manager, using the connector name as a filter for the label. 270 271 This method is a convenience method that returns the first secret found for the connector. 272 273 The label key used to filter the secrets is defined by the `CONNECTOR_LABEL` attribute, 274 which defaults to 'connector'. 275 276 Args: 277 connector_name (str): The name of the connector to filter by. 278 279 Returns: 280 GSMSecretHandle: A handle for the matching secret. 281 """ 282 results: Iterable[GSMSecretHandle] = self.fetch_connector_secrets(connector_name) 283 try: 284 result = next(iter(results)) 285 except StopIteration: 286 raise exc.PyAirbyteError( 287 message="No secrets found for connector.", 288 guidance=( 289 "Please check that the connector name is correct " 290 "and that the secret is correctly labeled." 291 ), 292 context={ 293 "project": self.project, 294 "connector_name": connector_name, 295 "label_key": self.CONNECTOR_LABEL, 296 }, 297 ) from None 298 299 return result
Fetch secret in the secret manager, using the connector name as a filter for the label.
This method is a convenience method that returns the first secret found for the connector.
The label key used to filter the secrets is defined by the CONNECTOR_LABEL
attribute,
which defaults to 'connector'.
Arguments:
- connector_name (str): The name of the connector to filter by.
Returns:
GSMSecretHandle: A handle for the matching secret.
Inherited Members
47def register_secret_manager( 48 secret_manager: CustomSecretManager, 49 *, 50 as_backup: bool = False, 51 replace_existing: bool = False, 52) -> None: 53 """Register a custom secret manager.""" 54 if replace_existing: 55 clear_secret_sources() 56 57 if as_backup: 58 # Add to end of list 59 _SECRETS_SOURCES.append(secret_manager) 60 else: 61 # Add to beginning of list 62 _SECRETS_SOURCES.insert(0, secret_manager)
Register a custom secret manager.
70def disable_secret_source(source: SecretManager | SecretSourceEnum) -> None: 71 """Disable one of the default secrets sources. 72 73 This function can accept either a `SecretManager` instance, a `SecretSourceEnum` enum value, or 74 a string representing the name of the source to disable. 75 """ 76 if isinstance(source, SecretManager) and source in _SECRETS_SOURCES: 77 _SECRETS_SOURCES.remove(source) 78 return 79 80 # Else, remove by name 81 for s in list(_SECRETS_SOURCES).copy(): 82 if s.name == str(source): 83 _SECRETS_SOURCES.remove(s)
Disable one of the default secrets sources.
This function can accept either a SecretManager
instance, a SecretSourceEnum
enum value, or
a string representing the name of the source to disable.