airbyte_cdk.sources.declarative.transformations

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5
 6# RecordTransformation is depended upon by every class in this module (since it's the abc everything implements). For this reason,
 7# the order of imports matters i.e: this file must fully import RecordTransformation before importing anything which depends on RecordTransformation
 8# Otherwise there will be a circular dependency (load order will be init.py --> RemoveFields (which tries to import RecordTransformation) -->
 9# init.py --> circular dep error, since loading this file causes it to try to import itself down the line.
10# so we add the split directive below to tell isort to sort imports while keeping RecordTransformation as the first import
11from .transformation import RecordTransformation
12
13# isort: split
14from .add_fields import AddFields
15from .remove_fields import RemoveFields
16
17__all__ = ["AddFields", "RecordTransformation", "RemoveFields"]
 37@dataclass
 38class AddFields(RecordTransformation):
 39    """
 40    Transformation which adds field to an output record. The path of the added field can be nested. Adding nested fields will create all
 41    necessary parent objects (like mkdir -p). Adding fields to an array will extend the array to that index (filling intermediate
 42    indices with null values). So if you add a field at index 5 to the array ["value"], it will become ["value", null, null, null, null,
 43    "new_value"].
 44
 45
 46    This transformation has access to the following contextual values:
 47        record: the record about to be output by the connector
 48        config: the input configuration provided to a connector
 49        stream_state: the current state of the stream
 50        stream_slice: the current stream slice being read
 51
 52
 53
 54    Examples of instantiating this transformation via YAML:
 55    - type: AddFields
 56      fields:
 57        # hardcoded constant
 58        - path: ["path"]
 59          value: "static_value"
 60
 61        # nested path
 62        - path: ["path", "to", "field"]
 63          value: "static"
 64
 65        # from config
 66        - path: ["shop_id"]
 67          value: "{{ config.shop_id }}"
 68
 69        # from stream_interval
 70        - path: ["date"]
 71          value: "{{ stream_interval.start_date }}"
 72
 73        # from record
 74        - path: ["unnested_value"]
 75          value: {{ record.nested.field }}
 76
 77        # from stream_slice
 78        - path: ["start_date"]
 79          value: {{ stream_slice.start_date }}
 80
 81        # by supplying any valid Jinja template directive or expression https://jinja.palletsprojects.com/en/3.1.x/templates/#
 82        - path: ["two_times_two"]
 83          value: {{ 2 * 2 }}
 84
 85    Attributes:
 86        fields (List[AddedFieldDefinition]): A list of transformations (path and corresponding value) that will be added to the record
 87    """
 88
 89    fields: List[AddedFieldDefinition]
 90    parameters: InitVar[Mapping[str, Any]]
 91    condition: str = ""
 92    _parsed_fields: List[ParsedAddFieldDefinition] = field(
 93        init=False, repr=False, default_factory=list
 94    )
 95
 96    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 97        self._filter_interpolator = InterpolatedBoolean(
 98            condition=self.condition, parameters=parameters
 99        )
100
101        for add_field in self.fields:
102            if len(add_field.path) < 1:
103                raise ValueError(
104                    f"Expected a non-zero-length path for the AddFields transformation {add_field}"
105                )
106
107            if not isinstance(add_field.value, InterpolatedString):
108                if not isinstance(add_field.value, str):
109                    raise f"Expected a string value for the AddFields transformation: {add_field}"
110                else:
111                    self._parsed_fields.append(
112                        ParsedAddFieldDefinition(
113                            add_field.path,
114                            InterpolatedString.create(add_field.value, parameters=parameters),
115                            value_type=add_field.value_type,
116                            parameters=parameters,
117                        )
118                    )
119            else:
120                self._parsed_fields.append(
121                    ParsedAddFieldDefinition(
122                        add_field.path,
123                        add_field.value,
124                        value_type=add_field.value_type,
125                        parameters={},
126                    )
127                )
128
129    def transform(
130        self,
131        record: Dict[str, Any],
132        config: Optional[Config] = None,
133        stream_state: Optional[StreamState] = None,
134        stream_slice: Optional[StreamSlice] = None,
135    ) -> None:
136        if config is None:
137            config = {}
138        kwargs = {"record": record, "stream_slice": stream_slice}
139        for parsed_field in self._parsed_fields:
140            valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
141            value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
142            is_empty_condition = not self.condition
143            if is_empty_condition or self._filter_interpolator.eval(config, value=value, **kwargs):
144                dpath.new(record, parsed_field.path, value)
145
146    def __eq__(self, other: Any) -> bool:
147        return bool(self.__dict__ == other.__dict__)

Transformation which adds field to an output record. The path of the added field can be nested. Adding nested fields will create all necessary parent objects (like mkdir -p). Adding fields to an array will extend the array to that index (filling intermediate indices with null values). So if you add a field at index 5 to the array ["value"], it will become ["value", null, null, null, null, "new_value"].

This transformation has access to the following contextual values:

record: the record about to be output by the connector config: the input configuration provided to a connector stream_state: the current state of the stream stream_slice: the current stream slice being read

Examples of instantiating this transformation via YAML:

  • type: AddFields fields: # hardcoded constant
    • path: ["path"] value: "static_value"
# nested path
- path: ["path", "to", "field"]
  value: "static"

# from config
- path: ["shop_id"]
  value: "{{ config.shop_id }}"

# from stream_interval
- path: ["date"]
  value: "{{ stream_interval.start_date }}"

# from record
- path: ["unnested_value"]
  value: {{ record.nested.field }}

# from stream_slice
- path: ["start_date"]
  value: {{ stream_slice.start_date }}

# by supplying any valid Jinja template directive or expression https://jinja.palletsprojects.com/en/3.1.x/templates/#
- path: ["two_times_two"]
  value: {{ 2 * 2 }}
Attributes:
  • fields (List[AddedFieldDefinition]): A list of transformations (path and corresponding value) that will be added to the record
AddFields( fields: List[airbyte_cdk.AddedFieldDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], condition: str = '')
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
condition: str = ''
def transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
129    def transform(
130        self,
131        record: Dict[str, Any],
132        config: Optional[Config] = None,
133        stream_state: Optional[StreamState] = None,
134        stream_slice: Optional[StreamSlice] = None,
135    ) -> None:
136        if config is None:
137            config = {}
138        kwargs = {"record": record, "stream_slice": stream_slice}
139        for parsed_field in self._parsed_fields:
140            valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
141            value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
142            is_empty_condition = not self.condition
143            if is_empty_condition or self._filter_interpolator.eval(config, value=value, **kwargs):
144                dpath.new(record, parsed_field.path, value)

Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.

Parameters
  • record: The input record to be transformed
  • config: The user-provided configuration as specified by the source's spec
  • stream_state: The stream state
  • stream_slice: The stream slice
Returns

The transformed record

@dataclass
class RecordTransformation:
13@dataclass
14class RecordTransformation:
15    """
16    Implementations of this class define transformations that can be applied to records of a stream.
17    """
18
19    @abstractmethod
20    def transform(
21        self,
22        record: Dict[str, Any],
23        config: Optional[Config] = None,
24        stream_state: Optional[StreamState] = None,
25        stream_slice: Optional[StreamSlice] = None,
26    ) -> None:
27        """
28        Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.
29
30        :param record: The input record to be transformed
31        :param config: The user-provided configuration as specified by the source's spec
32        :param stream_state: The stream state
33        :param stream_slice: The stream slice
34        :return: The transformed record
35        """
36
37    def __eq__(self, other: object) -> bool:
38        return other.__dict__ == self.__dict__

Implementations of this class define transformations that can be applied to records of a stream.

@abstractmethod
def transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
19    @abstractmethod
20    def transform(
21        self,
22        record: Dict[str, Any],
23        config: Optional[Config] = None,
24        stream_state: Optional[StreamState] = None,
25        stream_slice: Optional[StreamSlice] = None,
26    ) -> None:
27        """
28        Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.
29
30        :param record: The input record to be transformed
31        :param config: The user-provided configuration as specified by the source's spec
32        :param stream_state: The stream state
33        :param stream_slice: The stream slice
34        :return: The transformed record
35        """

Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.

Parameters
  • record: The input record to be transformed
  • config: The user-provided configuration as specified by the source's spec
  • stream_state: The stream state
  • stream_slice: The stream slice
Returns

The transformed record

@dataclass
class RemoveFields(airbyte_cdk.sources.declarative.transformations.RecordTransformation):
17@dataclass
18class RemoveFields(RecordTransformation):
19    """
20    A transformation which removes fields from a record. The fields removed are designated using FieldPointers.
21    During transformation, if a field or any of its parents does not exist in the record, no error is thrown.
22
23    If an input field pointer references an item in a list (e.g: ["k", 0] in the object {"k": ["a", "b", "c"]}) then
24    the object at that index is set to None rather than being not entirely removed from the list. TODO change this behavior.
25
26    It's possible to remove objects nested in lists e.g: removing [".", 0, "k"] from {".": [{"k": "V"}]} results in {".": [{}]}
27
28    Usage syntax:
29
30    ```yaml
31        my_stream:
32            <other parameters..>
33            transformations:
34                - type: RemoveFields
35                  field_pointers:
36                    - ["path", "to", "field1"]
37                    - ["path2"]
38    ```
39
40    Attributes:
41        field_pointers (List[FieldPointer]): pointers to the fields that should be removed
42    """
43
44    field_pointers: List[FieldPointer]
45    parameters: InitVar[Mapping[str, Any]]
46    condition: str = ""
47
48    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
49        self._filter_interpolator = InterpolatedBoolean(
50            condition=self.condition, parameters=parameters
51        )
52
53    def transform(
54        self,
55        record: Dict[str, Any],
56        config: Optional[Config] = None,
57        stream_state: Optional[StreamState] = None,
58        stream_slice: Optional[StreamSlice] = None,
59    ) -> None:
60        """
61        :param record: The record to be transformed
62        :return: the input record with the requested fields removed
63        """
64        for pointer in self.field_pointers:
65            # the dpath library by default doesn't delete fields from arrays
66            try:
67                dpath.delete(
68                    record,
69                    pointer,
70                    afilter=(lambda x: self._filter_interpolator.eval(config or {}, property=x))
71                    if self.condition
72                    else None,
73                )
74            except dpath.exceptions.PathNotFound:
75                # if the (potentially nested) property does not exist, silently skip
76                pass

A transformation which removes fields from a record. The fields removed are designated using FieldPointers. During transformation, if a field or any of its parents does not exist in the record, no error is thrown.

If an input field pointer references an item in a list (e.g: ["k", 0] in the object {"k": ["a", "b", "c"]}) then the object at that index is set to None rather than being not entirely removed from the list. TODO change this behavior.

It's possible to remove objects nested in lists e.g: removing [".", 0, "k"] from {".": [{"k": "V"}]} results in {".": [{}]}

Usage syntax:

    my_stream:
        <other parameters..>
        transformations:
            - type: RemoveFields
              field_pointers:
                - ["path", "to", "field1"]
                - ["path2"]
Attributes:
  • field_pointers (List[FieldPointer]): pointers to the fields that should be removed
RemoveFields( field_pointers: List[List[str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], condition: str = '')
field_pointers: List[List[str]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
condition: str = ''
def transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
53    def transform(
54        self,
55        record: Dict[str, Any],
56        config: Optional[Config] = None,
57        stream_state: Optional[StreamState] = None,
58        stream_slice: Optional[StreamSlice] = None,
59    ) -> None:
60        """
61        :param record: The record to be transformed
62        :return: the input record with the requested fields removed
63        """
64        for pointer in self.field_pointers:
65            # the dpath library by default doesn't delete fields from arrays
66            try:
67                dpath.delete(
68                    record,
69                    pointer,
70                    afilter=(lambda x: self._filter_interpolator.eval(config or {}, property=x))
71                    if self.condition
72                    else None,
73                )
74            except dpath.exceptions.PathNotFound:
75                # if the (potentially nested) property does not exist, silently skip
76                pass
Parameters
  • record: The record to be transformed
Returns

the input record with the requested fields removed