airbyte_cdk.sources.declarative.transformations
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6# RecordTransformation is depended upon by every class in this module (since it's the abc everything implements). For this reason, 7# the order of imports matters i.e: this file must fully import RecordTransformation before importing anything which depends on RecordTransformation 8# Otherwise there will be a circular dependency (load order will be init.py --> RemoveFields (which tries to import RecordTransformation) --> 9# init.py --> circular dep error, since loading this file causes it to try to import itself down the line. 10# so we add the split directive below to tell isort to sort imports while keeping RecordTransformation as the first import 11from .transformation import RecordTransformation 12 13# isort: split 14from .add_fields import AddFields 15from .remove_fields import RemoveFields 16 17__all__ = ["AddFields", "RecordTransformation", "RemoveFields"]
37@dataclass 38class AddFields(RecordTransformation): 39 """ 40 Transformation which adds field to an output record. The path of the added field can be nested. Adding nested fields will create all 41 necessary parent objects (like mkdir -p). Adding fields to an array will extend the array to that index (filling intermediate 42 indices with null values). So if you add a field at index 5 to the array ["value"], it will become ["value", null, null, null, null, 43 "new_value"]. 44 45 46 This transformation has access to the following contextual values: 47 record: the record about to be output by the connector 48 config: the input configuration provided to a connector 49 stream_state: the current state of the stream 50 stream_slice: the current stream slice being read 51 52 53 54 Examples of instantiating this transformation via YAML: 55 - type: AddFields 56 fields: 57 # hardcoded constant 58 - path: ["path"] 59 value: "static_value" 60 61 # nested path 62 - path: ["path", "to", "field"] 63 value: "static" 64 65 # from config 66 - path: ["shop_id"] 67 value: "{{ config.shop_id }}" 68 69 # from stream_interval 70 - path: ["date"] 71 value: "{{ stream_interval.start_date }}" 72 73 # from record 74 - path: ["unnested_value"] 75 value: {{ record.nested.field }} 76 77 # from stream_slice 78 - path: ["start_date"] 79 value: {{ stream_slice.start_date }} 80 81 # by supplying any valid Jinja template directive or expression https://jinja.palletsprojects.com/en/3.1.x/templates/# 82 - path: ["two_times_two"] 83 value: {{ 2 * 2 }} 84 85 Attributes: 86 fields (List[AddedFieldDefinition]): A list of transformations (path and corresponding value) that will be added to the record 87 """ 88 89 fields: List[AddedFieldDefinition] 90 parameters: InitVar[Mapping[str, Any]] 91 condition: str = "" 92 _parsed_fields: List[ParsedAddFieldDefinition] = field( 93 init=False, repr=False, default_factory=list 94 ) 95 96 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 97 self._filter_interpolator = InterpolatedBoolean( 98 condition=self.condition, parameters=parameters 99 ) 100 101 for add_field in self.fields: 102 if len(add_field.path) < 1: 103 raise ValueError( 104 f"Expected a non-zero-length path for the AddFields transformation {add_field}" 105 ) 106 107 if not isinstance(add_field.value, InterpolatedString): 108 if not isinstance(add_field.value, str): 109 raise f"Expected a string value for the AddFields transformation: {add_field}" 110 else: 111 self._parsed_fields.append( 112 ParsedAddFieldDefinition( 113 add_field.path, 114 InterpolatedString.create(add_field.value, parameters=parameters), 115 value_type=add_field.value_type, 116 parameters=parameters, 117 ) 118 ) 119 else: 120 self._parsed_fields.append( 121 ParsedAddFieldDefinition( 122 add_field.path, 123 add_field.value, 124 value_type=add_field.value_type, 125 parameters={}, 126 ) 127 ) 128 129 def transform( 130 self, 131 record: Dict[str, Any], 132 config: Optional[Config] = None, 133 stream_state: Optional[StreamState] = None, 134 stream_slice: Optional[StreamSlice] = None, 135 ) -> None: 136 if config is None: 137 config = {} 138 kwargs = {"record": record, "stream_slice": stream_slice} 139 for parsed_field in self._parsed_fields: 140 valid_types = (parsed_field.value_type,) if parsed_field.value_type else None 141 value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs) 142 is_empty_condition = not self.condition 143 if is_empty_condition or self._filter_interpolator.eval(config, value=value, **kwargs): 144 dpath.new(record, parsed_field.path, value) 145 146 def __eq__(self, other: Any) -> bool: 147 return bool(self.__dict__ == other.__dict__)
Transformation which adds field to an output record. The path of the added field can be nested. Adding nested fields will create all necessary parent objects (like mkdir -p). Adding fields to an array will extend the array to that index (filling intermediate indices with null values). So if you add a field at index 5 to the array ["value"], it will become ["value", null, null, null, null, "new_value"].
This transformation has access to the following contextual values:
record: the record about to be output by the connector config: the input configuration provided to a connector stream_state: the current state of the stream stream_slice: the current stream slice being read
Examples of instantiating this transformation via YAML:
- type: AddFields
fields:
# hardcoded constant
- path: ["path"] value: "static_value"
# nested path
- path: ["path", "to", "field"]
value: "static"
# from config
- path: ["shop_id"]
value: "{{ config.shop_id }}"
# from stream_interval
- path: ["date"]
value: "{{ stream_interval.start_date }}"
# from record
- path: ["unnested_value"]
value: {{ record.nested.field }}
# from stream_slice
- path: ["start_date"]
value: {{ stream_slice.start_date }}
# by supplying any valid Jinja template directive or expression https://jinja.palletsprojects.com/en/3.1.x/templates/#
- path: ["two_times_two"]
value: {{ 2 * 2 }}
Attributes:
- fields (List[AddedFieldDefinition]): A list of transformations (path and corresponding value) that will be added to the record
129 def transform( 130 self, 131 record: Dict[str, Any], 132 config: Optional[Config] = None, 133 stream_state: Optional[StreamState] = None, 134 stream_slice: Optional[StreamSlice] = None, 135 ) -> None: 136 if config is None: 137 config = {} 138 kwargs = {"record": record, "stream_slice": stream_slice} 139 for parsed_field in self._parsed_fields: 140 valid_types = (parsed_field.value_type,) if parsed_field.value_type else None 141 value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs) 142 is_empty_condition = not self.condition 143 if is_empty_condition or self._filter_interpolator.eval(config, value=value, **kwargs): 144 dpath.new(record, parsed_field.path, value)
Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.
Parameters
- record: The input record to be transformed
- config: The user-provided configuration as specified by the source's spec
- stream_state: The stream state
- stream_slice: The stream slice
Returns
The transformed record
13@dataclass 14class RecordTransformation: 15 """ 16 Implementations of this class define transformations that can be applied to records of a stream. 17 """ 18 19 @abstractmethod 20 def transform( 21 self, 22 record: Dict[str, Any], 23 config: Optional[Config] = None, 24 stream_state: Optional[StreamState] = None, 25 stream_slice: Optional[StreamSlice] = None, 26 ) -> None: 27 """ 28 Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument. 29 30 :param record: The input record to be transformed 31 :param config: The user-provided configuration as specified by the source's spec 32 :param stream_state: The stream state 33 :param stream_slice: The stream slice 34 :return: The transformed record 35 """ 36 37 def __eq__(self, other: object) -> bool: 38 return other.__dict__ == self.__dict__
Implementations of this class define transformations that can be applied to records of a stream.
19 @abstractmethod 20 def transform( 21 self, 22 record: Dict[str, Any], 23 config: Optional[Config] = None, 24 stream_state: Optional[StreamState] = None, 25 stream_slice: Optional[StreamSlice] = None, 26 ) -> None: 27 """ 28 Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument. 29 30 :param record: The input record to be transformed 31 :param config: The user-provided configuration as specified by the source's spec 32 :param stream_state: The stream state 33 :param stream_slice: The stream slice 34 :return: The transformed record 35 """
Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.
Parameters
- record: The input record to be transformed
- config: The user-provided configuration as specified by the source's spec
- stream_state: The stream state
- stream_slice: The stream slice
Returns
The transformed record
17@dataclass 18class RemoveFields(RecordTransformation): 19 """ 20 A transformation which removes fields from a record. The fields removed are designated using FieldPointers. 21 During transformation, if a field or any of its parents does not exist in the record, no error is thrown. 22 23 If an input field pointer references an item in a list (e.g: ["k", 0] in the object {"k": ["a", "b", "c"]}) then 24 the object at that index is set to None rather than being not entirely removed from the list. TODO change this behavior. 25 26 It's possible to remove objects nested in lists e.g: removing [".", 0, "k"] from {".": [{"k": "V"}]} results in {".": [{}]} 27 28 Usage syntax: 29 30 ```yaml 31 my_stream: 32 <other parameters..> 33 transformations: 34 - type: RemoveFields 35 field_pointers: 36 - ["path", "to", "field1"] 37 - ["path2"] 38 ``` 39 40 Attributes: 41 field_pointers (List[FieldPointer]): pointers to the fields that should be removed 42 """ 43 44 field_pointers: List[FieldPointer] 45 parameters: InitVar[Mapping[str, Any]] 46 condition: str = "" 47 48 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 49 self._filter_interpolator = InterpolatedBoolean( 50 condition=self.condition, parameters=parameters 51 ) 52 53 def transform( 54 self, 55 record: Dict[str, Any], 56 config: Optional[Config] = None, 57 stream_state: Optional[StreamState] = None, 58 stream_slice: Optional[StreamSlice] = None, 59 ) -> None: 60 """ 61 :param record: The record to be transformed 62 :return: the input record with the requested fields removed 63 """ 64 for pointer in self.field_pointers: 65 # the dpath library by default doesn't delete fields from arrays 66 try: 67 dpath.delete( 68 record, 69 pointer, 70 afilter=(lambda x: self._filter_interpolator.eval(config or {}, property=x)) 71 if self.condition 72 else None, 73 ) 74 except dpath.exceptions.PathNotFound: 75 # if the (potentially nested) property does not exist, silently skip 76 pass
A transformation which removes fields from a record. The fields removed are designated using FieldPointers. During transformation, if a field or any of its parents does not exist in the record, no error is thrown.
If an input field pointer references an item in a list (e.g: ["k", 0] in the object {"k": ["a", "b", "c"]}) then the object at that index is set to None rather than being not entirely removed from the list. TODO change this behavior.
It's possible to remove objects nested in lists e.g: removing [".", 0, "k"] from {".": [{"k": "V"}]} results in {".": [{}]}
Usage syntax:
my_stream:
<other parameters..>
transformations:
- type: RemoveFields
field_pointers:
- ["path", "to", "field1"]
- ["path2"]
Attributes:
- field_pointers (List[FieldPointer]): pointers to the fields that should be removed
53 def transform( 54 self, 55 record: Dict[str, Any], 56 config: Optional[Config] = None, 57 stream_state: Optional[StreamState] = None, 58 stream_slice: Optional[StreamSlice] = None, 59 ) -> None: 60 """ 61 :param record: The record to be transformed 62 :return: the input record with the requested fields removed 63 """ 64 for pointer in self.field_pointers: 65 # the dpath library by default doesn't delete fields from arrays 66 try: 67 dpath.delete( 68 record, 69 pointer, 70 afilter=(lambda x: self._filter_interpolator.eval(config or {}, property=x)) 71 if self.condition 72 else None, 73 ) 74 except dpath.exceptions.PathNotFound: 75 # if the (potentially nested) property does not exist, silently skip 76 pass
Parameters
- record: The record to be transformed
Returns
the input record with the requested fields removed