airbyte_cdk.sources.declarative.transformations.config_transformations

1#
2# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3#
4
5from .add_fields import ConfigAddFields
6from .remap_field import ConfigRemapField
7from .remove_fields import ConfigRemoveFields
8
9__all__ = ["ConfigRemapField", "ConfigAddFields", "ConfigRemoveFields"]
16@dataclass
17class ConfigRemapField(ConfigTransformation):
18    """
19    Transformation that remaps a field's value to another value based on a static map.
20    """
21
22    map: Mapping[str, Any]
23    field_path: List[str]
24    config: Mapping[str, Any] = field(default_factory=dict)
25
26    def __post_init__(self) -> None:
27        if not self.field_path:
28            raise Exception("field_path cannot be empty.")
29        self._field_path = [
30            InterpolatedString.create(path, parameters={}) for path in self.field_path
31        ]
32        for path_index in range(len(self.field_path)):
33            if isinstance(self.field_path[path_index], str):
34                self._field_path[path_index] = InterpolatedString.create(
35                    self.field_path[path_index], parameters={}
36                )
37        self._map = InterpolatedMapping(self.map, parameters={})
38
39    def transform(
40        self,
41        config: MutableMapping[str, Any],
42    ) -> None:
43        """
44        Transforms a config by remapping a field value based on the provided map.
45        If the original value is found in the map, it's replaced with the mapped value.
46        If the value is not in the map, the field remains unchanged.
47
48        :param config: The user-provided configuration to be transformed
49        """
50        path_components = [path.eval(config) for path in self._field_path]
51
52        current = config
53        for i, component in enumerate(path_components[:-1]):
54            if component not in current:
55                return
56            current = current[component]
57
58            if not isinstance(current, MutableMapping):
59                return
60
61        field_name = path_components[-1]
62
63        mapping = self._map.eval(config=self.config)
64
65        if field_name in current and current[field_name] in mapping:
66            current[field_name] = mapping[current[field_name]]

Transformation that remaps a field's value to another value based on a static map.

ConfigRemapField( map: Mapping[str, Any], field_path: List[str], config: Mapping[str, Any] = <factory>)
map: Mapping[str, Any]
field_path: List[str]
config: Mapping[str, Any]
def transform(self, config: MutableMapping[str, Any]) -> None:
39    def transform(
40        self,
41        config: MutableMapping[str, Any],
42    ) -> None:
43        """
44        Transforms a config by remapping a field value based on the provided map.
45        If the original value is found in the map, it's replaced with the mapped value.
46        If the value is not in the map, the field remains unchanged.
47
48        :param config: The user-provided configuration to be transformed
49        """
50        path_components = [path.eval(config) for path in self._field_path]
51
52        current = config
53        for i, component in enumerate(path_components[:-1]):
54            if component not in current:
55                return
56            current = current[component]
57
58            if not isinstance(current, MutableMapping):
59                return
60
61        field_name = path_components[-1]
62
63        mapping = self._map.eval(config=self.config)
64
65        if field_name in current and current[field_name] in mapping:
66            current[field_name] = mapping[current[field_name]]

Transforms a config by remapping a field value based on the provided map. If the original value is found in the map, it's replaced with the mapped value. If the value is not in the map, the field remains unchanged.

Parameters
  • config: The user-provided configuration to be transformed
 22@dataclass
 23class ConfigAddFields(ConfigTransformation):
 24    """
 25    Transformation which adds fields to a config. The path of the added field can be nested. Adding nested fields will create all
 26    necessary parent objects (like mkdir -p).
 27
 28    This transformation has access to the config being transformed.
 29
 30    Examples of instantiating this transformation via YAML:
 31    - type: ConfigAddFields
 32      fields:
 33        ### hardcoded constant
 34        - path: ["path"]
 35          value: "static_value"
 36
 37        ### nested path
 38        - path: ["path", "to", "field"]
 39          value: "static"
 40
 41        ### from config
 42        - path: ["derived_field"]
 43          value: "{{ config.original_field }}"
 44
 45        ### by supplying any valid Jinja template directive or expression
 46        - path: ["two_times_two"]
 47          value: "{{ 2 * 2 }}"
 48
 49    Attributes:
 50        fields (List[AddedFieldDefinition]): A list of transformations (path and corresponding value) that will be added to the config
 51    """
 52
 53    fields: List[AddedFieldDefinition]
 54    condition: str = ""
 55    _parsed_fields: List[ParsedAddFieldDefinition] = field(
 56        init=False, repr=False, default_factory=list
 57    )
 58
 59    def __post_init__(self) -> None:
 60        self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters={})
 61
 62        for add_field in self.fields:
 63            if len(add_field.path) < 1:
 64                raise ValueError(
 65                    f"Expected a non-zero-length path for the AddFields transformation {add_field}"
 66                )
 67
 68            if not isinstance(add_field.value, InterpolatedString):
 69                if not isinstance(add_field.value, str):
 70                    raise ValueError(
 71                        f"Expected a string value for the AddFields transformation: {add_field}"
 72                    )
 73                else:
 74                    self._parsed_fields.append(
 75                        ParsedAddFieldDefinition(
 76                            add_field.path,
 77                            InterpolatedString.create(add_field.value, parameters={}),
 78                            value_type=add_field.value_type,
 79                            parameters={},
 80                        )
 81                    )
 82            else:
 83                self._parsed_fields.append(
 84                    ParsedAddFieldDefinition(
 85                        add_field.path,
 86                        add_field.value,
 87                        value_type=add_field.value_type,
 88                        parameters={},
 89                    )
 90                )
 91
 92    def transform(
 93        self,
 94        config: MutableMapping[str, Any],
 95    ) -> None:
 96        """
 97        Transforms a config by adding fields based on the provided field definitions.
 98
 99        :param config: The user-provided configuration to be transformed
100        """
101        for parsed_field in self._parsed_fields:
102            valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
103            value = parsed_field.value.eval(config, valid_types=valid_types)
104            if not self.condition or self._filter_interpolator.eval(
105                config, value=value, path=parsed_field.path
106            ):
107                dpath.new(config, parsed_field.path, value)

Transformation which adds fields to a config. The path of the added field can be nested. Adding nested fields will create all necessary parent objects (like mkdir -p).

This transformation has access to the config being transformed.

Examples of instantiating this transformation via YAML:

  • type: ConfigAddFields fields: ### hardcoded constant
    • path: ["path"] value: "static_value"
### nested path
- path: ["path", "to", "field"]
  value: "static"

### from config
- path: ["derived_field"]
  value: "{{ config.original_field }}"

### by supplying any valid Jinja template directive or expression
- path: ["two_times_two"]
  value: "{{ 2 * 2 }}"
Attributes:
  • fields (List[AddedFieldDefinition]): A list of transformations (path and corresponding value) that will be added to the config
ConfigAddFields( fields: List[airbyte_cdk.AddedFieldDefinition], condition: str = '')
condition: str = ''
def transform(self, config: MutableMapping[str, Any]) -> None:
 92    def transform(
 93        self,
 94        config: MutableMapping[str, Any],
 95    ) -> None:
 96        """
 97        Transforms a config by adding fields based on the provided field definitions.
 98
 99        :param config: The user-provided configuration to be transformed
100        """
101        for parsed_field in self._parsed_fields:
102            valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
103            value = parsed_field.value.eval(config, valid_types=valid_types)
104            if not self.condition or self._filter_interpolator.eval(
105                config, value=value, path=parsed_field.path
106            ):
107                dpath.new(config, parsed_field.path, value)

Transforms a config by adding fields based on the provided field definitions.

Parameters
  • config: The user-provided configuration to be transformed
18@dataclass
19class ConfigRemoveFields(ConfigTransformation):
20    """
21    A transformation which removes fields from a config. The fields removed are designated using FieldPointers.
22    During transformation, if a field or any of its parents does not exist in the config, no error is thrown.
23
24    If an input field pointer references an item in a list (e.g: ["k", 0] in the object {"k": ["a", "b", "c"]}) then
25    the object at that index is set to None rather than being entirely removed from the list.
26
27    It's possible to remove objects nested in lists e.g: removing [".", 0, "k"] from {".": [{"k": "V"}]} results in {".": [{}]}
28
29    Usage syntax:
30
31    ```yaml
32        config_transformations:
33          - type: RemoveFields
34            field_pointers:
35              - ["path", "to", "field1"]
36              - ["path2"]
37            condition: "{{ config.some_flag }}" # Optional condition
38    ```
39
40    Attributes:
41        field_pointers (List[FieldPointer]): pointers to the fields that should be removed
42        condition (str): Optional condition that determines if the fields should be removed
43    """
44
45    field_pointers: List[FieldPointer]
46    condition: str = ""
47
48    def __post_init__(self) -> None:
49        self._filter_interpolator = InterpolatedBoolean(condition=self.condition, parameters={})
50
51    def transform(
52        self,
53        config: MutableMapping[str, Any],
54    ) -> None:
55        """
56        Transforms a config by removing fields based on the provided field pointers.
57
58        :param config: The user-provided configuration to be transformed
59        """
60        if self.condition and not self._filter_interpolator.eval(config):
61            return
62
63        for pointer in self.field_pointers:
64            try:
65                dpath.delete(config, pointer)
66            except dpath.exceptions.PathNotFound:
67                pass

A transformation which removes fields from a config. The fields removed are designated using FieldPointers. During transformation, if a field or any of its parents does not exist in the config, no error is thrown.

If an input field pointer references an item in a list (e.g: ["k", 0] in the object {"k": ["a", "b", "c"]}) then the object at that index is set to None rather than being entirely removed from the list.

It's possible to remove objects nested in lists e.g: removing [".", 0, "k"] from {".": [{"k": "V"}]} results in {".": [{}]}

Usage syntax:

    config_transformations:
      - type: RemoveFields
        field_pointers:
          - ["path", "to", "field1"]
          - ["path2"]
        condition: "{{ config.some_flag }}" # Optional condition
Attributes:
  • field_pointers (List[FieldPointer]): pointers to the fields that should be removed
  • condition (str): Optional condition that determines if the fields should be removed
ConfigRemoveFields(field_pointers: List[List[str]], condition: str = '')
field_pointers: List[List[str]]
condition: str = ''
def transform(self, config: MutableMapping[str, Any]) -> None:
51    def transform(
52        self,
53        config: MutableMapping[str, Any],
54    ) -> None:
55        """
56        Transforms a config by removing fields based on the provided field pointers.
57
58        :param config: The user-provided configuration to be transformed
59        """
60        if self.condition and not self._filter_interpolator.eval(config):
61            return
62
63        for pointer in self.field_pointers:
64            try:
65                dpath.delete(config, pointer)
66            except dpath.exceptions.PathNotFound:
67                pass

Transforms a config by removing fields based on the provided field pointers.

Parameters
  • config: The user-provided configuration to be transformed