airbyte_cdk.sources.declarative.parsers.custom_code_compiler

Contains functions to compile custom code from text.

  1"""Contains functions to compile custom code from text."""
  2
  3import hashlib
  4import os
  5import sys
  6from collections.abc import Mapping
  7from types import ModuleType
  8from typing import Any, cast
  9
 10from typing_extensions import Literal
 11
 12ChecksumType = Literal["md5", "sha256"]
 13CHECKSUM_FUNCTIONS = {
 14    "md5": hashlib.md5,
 15    "sha256": hashlib.sha256,
 16}
 17COMPONENTS_MODULE_NAME = "components"
 18SDM_COMPONENTS_MODULE_NAME = "source_declarative_manifest.components"
 19INJECTED_MANIFEST = "__injected_declarative_manifest"
 20INJECTED_COMPONENTS_PY = "__injected_components_py"
 21INJECTED_COMPONENTS_PY_CHECKSUMS = "__injected_components_py_checksums"
 22ENV_VAR_ALLOW_CUSTOM_CODE = "AIRBYTE_ENABLE_UNSAFE_CODE"
 23
 24
 25class AirbyteCodeTamperedError(Exception):
 26    """Raised when the connector's components module does not match its checksum.
 27
 28    This is a fatal error, as it can be a sign of code tampering.
 29    """
 30
 31
 32class AirbyteCustomCodeNotPermittedError(Exception):
 33    """Raised when custom code is attempted to be run in an environment that does not support it."""
 34
 35    def __init__(self) -> None:
 36        super().__init__(
 37            "Custom connector code is not permitted in this environment. "
 38            "If you need to run custom code, please ask your administrator to set the `AIRBYTE_ENABLE_UNSAFE_CODE` "
 39            "environment variable to 'true' in your Airbyte environment. "
 40            "If you see this message in Airbyte Cloud, your workspace does not allow executing "
 41            "custom connector code."
 42        )
 43
 44
 45def _hash_text(input_text: str, hash_type: str = "md5") -> str:
 46    """Return the hash of the input text using the specified hash type."""
 47    if not input_text:
 48        raise ValueError("Hash input text cannot be empty.")
 49
 50    hash_object = CHECKSUM_FUNCTIONS[hash_type]()
 51    hash_object.update(input_text.encode())
 52    return hash_object.hexdigest()
 53
 54
 55def custom_code_execution_permitted() -> bool:
 56    """Return `True` if custom code execution is permitted, otherwise `False`.
 57
 58    Custom code execution is permitted if the `AIRBYTE_ENABLE_UNSAFE_CODE` environment variable is set to 'true'.
 59    """
 60    return os.environ.get(ENV_VAR_ALLOW_CUSTOM_CODE, "").lower() == "true"
 61
 62
 63def validate_python_code(
 64    code_text: str,
 65    checksums: dict[str, str] | None,
 66) -> None:
 67    """Validate the provided Python code text against the provided checksums.
 68
 69    Currently we fail if no checksums are provided, although this may change in the future.
 70    """
 71    if not code_text:
 72        # No code provided, nothing to validate.
 73        return
 74
 75    if not checksums:
 76        raise ValueError(f"A checksum is required to validate the code. Received: {checksums}")
 77
 78    for checksum_type, checksum in checksums.items():
 79        if checksum_type not in CHECKSUM_FUNCTIONS:
 80            raise ValueError(
 81                f"Unsupported checksum type: {checksum_type}. Supported checksum types are: {CHECKSUM_FUNCTIONS.keys()}"
 82            )
 83
 84        calculated_checksum = _hash_text(code_text, checksum_type)
 85        if calculated_checksum != checksum:
 86            raise AirbyteCodeTamperedError(
 87                f"{checksum_type} checksum does not match."
 88                + str(
 89                    {
 90                        "expected_checksum": checksum,
 91                        "actual_checksum": calculated_checksum,
 92                        "code_text": code_text,
 93                    }
 94                ),
 95            )
 96
 97
 98def get_registered_components_module(
 99    config: Mapping[str, Any] | None,
100) -> ModuleType | None:
101    """Get a components module object based on the provided config.
102
103    If custom python components is provided, this will be loaded. Otherwise, we will
104    attempt to load from the `components` module already imported/registered in sys.modules.
105
106    If custom `components.py` text is provided in config, it will be registered with sys.modules
107    so that it can be later imported by manifest declarations which reference the provided classes.
108
109    Returns `None` if no components is provided and the `components` module is not found.
110    """
111    if config and config.get(INJECTED_COMPONENTS_PY, None):
112        if not custom_code_execution_permitted():
113            raise AirbyteCustomCodeNotPermittedError
114
115        # Create a new module object and execute the provided Python code text within it
116        python_text: str = config[INJECTED_COMPONENTS_PY]
117        return register_components_module_from_string(
118            components_py_text=python_text,
119            checksums=config.get(INJECTED_COMPONENTS_PY_CHECKSUMS, None),
120        )
121
122    # Check for `components` or `source_declarative_manifest.components`.
123    if SDM_COMPONENTS_MODULE_NAME in sys.modules:
124        return cast(ModuleType, sys.modules.get(SDM_COMPONENTS_MODULE_NAME))
125
126    if COMPONENTS_MODULE_NAME in sys.modules:
127        return cast(ModuleType, sys.modules.get(COMPONENTS_MODULE_NAME))
128
129    # Could not find module 'components' in `sys.modules`
130    # and INJECTED_COMPONENTS_PY was not provided in config.
131    return None
132
133
134def register_components_module_from_string(
135    components_py_text: str,
136    checksums: dict[str, Any] | None,
137) -> ModuleType:
138    """Load and return the components module from a provided string containing the python code."""
139    # First validate the code
140    validate_python_code(
141        code_text=components_py_text,
142        checksums=checksums,
143    )
144
145    # Create a new module object
146    components_module = ModuleType(name=COMPONENTS_MODULE_NAME)
147
148    # Execute the module text in the module's namespace
149    exec(components_py_text, components_module.__dict__)
150
151    # Register the module in `sys.modules`` so it can be imported as
152    # `source_declarative_manifest.components` and/or `components`.
153    sys.modules[SDM_COMPONENTS_MODULE_NAME] = components_module
154    sys.modules[COMPONENTS_MODULE_NAME] = components_module
155
156    # Now you can import and use the module
157    return components_module
ChecksumType = typing.Literal['md5', 'sha256']
CHECKSUM_FUNCTIONS = {'md5': <built-in function openssl_md5>, 'sha256': <built-in function openssl_sha256>}
COMPONENTS_MODULE_NAME = 'components'
SDM_COMPONENTS_MODULE_NAME = 'source_declarative_manifest.components'
INJECTED_MANIFEST = '__injected_declarative_manifest'
INJECTED_COMPONENTS_PY = '__injected_components_py'
INJECTED_COMPONENTS_PY_CHECKSUMS = '__injected_components_py_checksums'
ENV_VAR_ALLOW_CUSTOM_CODE = 'AIRBYTE_ENABLE_UNSAFE_CODE'
class AirbyteCodeTamperedError(builtins.Exception):
26class AirbyteCodeTamperedError(Exception):
27    """Raised when the connector's components module does not match its checksum.
28
29    This is a fatal error, as it can be a sign of code tampering.
30    """

Raised when the connector's components module does not match its checksum.

This is a fatal error, as it can be a sign of code tampering.

class AirbyteCustomCodeNotPermittedError(builtins.Exception):
33class AirbyteCustomCodeNotPermittedError(Exception):
34    """Raised when custom code is attempted to be run in an environment that does not support it."""
35
36    def __init__(self) -> None:
37        super().__init__(
38            "Custom connector code is not permitted in this environment. "
39            "If you need to run custom code, please ask your administrator to set the `AIRBYTE_ENABLE_UNSAFE_CODE` "
40            "environment variable to 'true' in your Airbyte environment. "
41            "If you see this message in Airbyte Cloud, your workspace does not allow executing "
42            "custom connector code."
43        )

Raised when custom code is attempted to be run in an environment that does not support it.

def custom_code_execution_permitted() -> bool:
56def custom_code_execution_permitted() -> bool:
57    """Return `True` if custom code execution is permitted, otherwise `False`.
58
59    Custom code execution is permitted if the `AIRBYTE_ENABLE_UNSAFE_CODE` environment variable is set to 'true'.
60    """
61    return os.environ.get(ENV_VAR_ALLOW_CUSTOM_CODE, "").lower() == "true"

Return True if custom code execution is permitted, otherwise False.

Custom code execution is permitted if the AIRBYTE_ENABLE_UNSAFE_CODE environment variable is set to 'true'.

def validate_python_code(code_text: str, checksums: dict[str, str] | None) -> None:
64def validate_python_code(
65    code_text: str,
66    checksums: dict[str, str] | None,
67) -> None:
68    """Validate the provided Python code text against the provided checksums.
69
70    Currently we fail if no checksums are provided, although this may change in the future.
71    """
72    if not code_text:
73        # No code provided, nothing to validate.
74        return
75
76    if not checksums:
77        raise ValueError(f"A checksum is required to validate the code. Received: {checksums}")
78
79    for checksum_type, checksum in checksums.items():
80        if checksum_type not in CHECKSUM_FUNCTIONS:
81            raise ValueError(
82                f"Unsupported checksum type: {checksum_type}. Supported checksum types are: {CHECKSUM_FUNCTIONS.keys()}"
83            )
84
85        calculated_checksum = _hash_text(code_text, checksum_type)
86        if calculated_checksum != checksum:
87            raise AirbyteCodeTamperedError(
88                f"{checksum_type} checksum does not match."
89                + str(
90                    {
91                        "expected_checksum": checksum,
92                        "actual_checksum": calculated_checksum,
93                        "code_text": code_text,
94                    }
95                ),
96            )

Validate the provided Python code text against the provided checksums.

Currently we fail if no checksums are provided, although this may change in the future.

def get_registered_components_module(config: Mapping[str, typing.Any] | None) -> module | None:
 99def get_registered_components_module(
100    config: Mapping[str, Any] | None,
101) -> ModuleType | None:
102    """Get a components module object based on the provided config.
103
104    If custom python components is provided, this will be loaded. Otherwise, we will
105    attempt to load from the `components` module already imported/registered in sys.modules.
106
107    If custom `components.py` text is provided in config, it will be registered with sys.modules
108    so that it can be later imported by manifest declarations which reference the provided classes.
109
110    Returns `None` if no components is provided and the `components` module is not found.
111    """
112    if config and config.get(INJECTED_COMPONENTS_PY, None):
113        if not custom_code_execution_permitted():
114            raise AirbyteCustomCodeNotPermittedError
115
116        # Create a new module object and execute the provided Python code text within it
117        python_text: str = config[INJECTED_COMPONENTS_PY]
118        return register_components_module_from_string(
119            components_py_text=python_text,
120            checksums=config.get(INJECTED_COMPONENTS_PY_CHECKSUMS, None),
121        )
122
123    # Check for `components` or `source_declarative_manifest.components`.
124    if SDM_COMPONENTS_MODULE_NAME in sys.modules:
125        return cast(ModuleType, sys.modules.get(SDM_COMPONENTS_MODULE_NAME))
126
127    if COMPONENTS_MODULE_NAME in sys.modules:
128        return cast(ModuleType, sys.modules.get(COMPONENTS_MODULE_NAME))
129
130    # Could not find module 'components' in `sys.modules`
131    # and INJECTED_COMPONENTS_PY was not provided in config.
132    return None

Get a components module object based on the provided config.

If custom python components is provided, this will be loaded. Otherwise, we will attempt to load from the components module already imported/registered in sys.modules.

If custom components.py text is provided in config, it will be registered with sys.modules so that it can be later imported by manifest declarations which reference the provided classes.

Returns None if no components is provided and the components module is not found.

def register_components_module_from_string( components_py_text: str, checksums: dict[str, typing.Any] | None) -> module:
135def register_components_module_from_string(
136    components_py_text: str,
137    checksums: dict[str, Any] | None,
138) -> ModuleType:
139    """Load and return the components module from a provided string containing the python code."""
140    # First validate the code
141    validate_python_code(
142        code_text=components_py_text,
143        checksums=checksums,
144    )
145
146    # Create a new module object
147    components_module = ModuleType(name=COMPONENTS_MODULE_NAME)
148
149    # Execute the module text in the module's namespace
150    exec(components_py_text, components_module.__dict__)
151
152    # Register the module in `sys.modules`` so it can be imported as
153    # `source_declarative_manifest.components` and/or `components`.
154    sys.modules[SDM_COMPONENTS_MODULE_NAME] = components_module
155    sys.modules[COMPONENTS_MODULE_NAME] = components_module
156
157    # Now you can import and use the module
158    return components_module

Load and return the components module from a provided string containing the python code.