airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields
1from dataclasses import InitVar, dataclass 2from typing import Any, Dict, List, Mapping, Optional, Union 3 4import dpath 5 6from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString 7from airbyte_cdk.sources.declarative.transformations import RecordTransformation 8from airbyte_cdk.sources.types import Config, StreamSlice, StreamState 9 10 11@dataclass 12class KeyTransformation: 13 config: Config 14 parameters: InitVar[Mapping[str, Any]] 15 prefix: Optional[str] = None 16 suffix: Optional[str] = None 17 18 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 19 if self.prefix is not None: 20 self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval( 21 self.config 22 ) 23 if self.suffix is not None: 24 self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval( 25 self.config 26 ) 27 28 29@dataclass 30class DpathFlattenFields(RecordTransformation): 31 """ 32 Flatten fields only for provided path. 33 34 field_path: List[Union[InterpolatedString, str]] path to the field to flatten. 35 delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. 36 replace_record: bool = False whether to replace origin record or not. Default is False. 37 key_transformation: KeyTransformation = None how to transform extracted object keys 38 39 """ 40 41 config: Config 42 field_path: List[Union[InterpolatedString, str]] 43 parameters: InitVar[Mapping[str, Any]] 44 delete_origin_value: bool = False 45 replace_record: bool = False 46 key_transformation: Optional[KeyTransformation] = None 47 48 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 49 self._parameters = parameters 50 self._field_path = [ 51 InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path 52 ] 53 for path_index in range(len(self.field_path)): 54 if isinstance(self.field_path[path_index], str): 55 self._field_path[path_index] = InterpolatedString.create( 56 self.field_path[path_index], parameters=self._parameters 57 ) 58 59 def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]: 60 if self.key_transformation: 61 if self.key_transformation.prefix: 62 extracted = { 63 f"{self.key_transformation.prefix}{key}": value 64 for key, value in extracted.items() 65 } 66 67 if self.key_transformation.suffix: 68 extracted = { 69 f"{key}{self.key_transformation.suffix}": value 70 for key, value in extracted.items() 71 } 72 73 return extracted 74 75 def transform( 76 self, 77 record: Dict[str, Any], 78 config: Optional[Config] = None, 79 stream_state: Optional[StreamState] = None, 80 stream_slice: Optional[StreamSlice] = None, 81 ) -> None: 82 path = [path.eval(self.config) for path in self._field_path] 83 if "*" in path: 84 matched = dpath.values(record, path) 85 extracted = matched[0] if matched else None 86 else: 87 extracted = dpath.get(record, path, default=[]) 88 89 if isinstance(extracted, dict): 90 extracted = self._apply_key_transformation(extracted) 91 92 if self.replace_record and extracted: 93 dpath.delete(record, "**") 94 record.update(extracted) 95 else: 96 conflicts = set(extracted.keys()) & set(record.keys()) 97 if not conflicts: 98 if self.delete_origin_value: 99 dpath.delete(record, path) 100 record.update(extracted)
@dataclass
class
KeyTransformation:
12@dataclass 13class KeyTransformation: 14 config: Config 15 parameters: InitVar[Mapping[str, Any]] 16 prefix: Optional[str] = None 17 suffix: Optional[str] = None 18 19 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 20 if self.prefix is not None: 21 self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval( 22 self.config 23 ) 24 if self.suffix is not None: 25 self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval( 26 self.config 27 )
@dataclass
class
DpathFlattenFields30@dataclass 31class DpathFlattenFields(RecordTransformation): 32 """ 33 Flatten fields only for provided path. 34 35 field_path: List[Union[InterpolatedString, str]] path to the field to flatten. 36 delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. 37 replace_record: bool = False whether to replace origin record or not. Default is False. 38 key_transformation: KeyTransformation = None how to transform extracted object keys 39 40 """ 41 42 config: Config 43 field_path: List[Union[InterpolatedString, str]] 44 parameters: InitVar[Mapping[str, Any]] 45 delete_origin_value: bool = False 46 replace_record: bool = False 47 key_transformation: Optional[KeyTransformation] = None 48 49 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 50 self._parameters = parameters 51 self._field_path = [ 52 InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path 53 ] 54 for path_index in range(len(self.field_path)): 55 if isinstance(self.field_path[path_index], str): 56 self._field_path[path_index] = InterpolatedString.create( 57 self.field_path[path_index], parameters=self._parameters 58 ) 59 60 def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]: 61 if self.key_transformation: 62 if self.key_transformation.prefix: 63 extracted = { 64 f"{self.key_transformation.prefix}{key}": value 65 for key, value in extracted.items() 66 } 67 68 if self.key_transformation.suffix: 69 extracted = { 70 f"{key}{self.key_transformation.suffix}": value 71 for key, value in extracted.items() 72 } 73 74 return extracted 75 76 def transform( 77 self, 78 record: Dict[str, Any], 79 config: Optional[Config] = None, 80 stream_state: Optional[StreamState] = None, 81 stream_slice: Optional[StreamSlice] = None, 82 ) -> None: 83 path = [path.eval(self.config) for path in self._field_path] 84 if "*" in path: 85 matched = dpath.values(record, path) 86 extracted = matched[0] if matched else None 87 else: 88 extracted = dpath.get(record, path, default=[]) 89 90 if isinstance(extracted, dict): 91 extracted = self._apply_key_transformation(extracted) 92 93 if self.replace_record and extracted: 94 dpath.delete(record, "**") 95 record.update(extracted) 96 else: 97 conflicts = set(extracted.keys()) & set(record.keys()) 98 if not conflicts: 99 if self.delete_origin_value: 100 dpath.delete(record, path) 101 record.update(extracted)
Flatten fields only for provided path.
field_path: List[Union[InterpolatedString, str]] path to the field to flatten. delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. replace_record: bool = False whether to replace origin record or not. Default is False. key_transformation: KeyTransformation = None how to transform extracted object keys
DpathFlattenFields( config: Mapping[str, Any], field_path: List[Union[airbyte_cdk.InterpolatedString, str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], delete_origin_value: bool = False, replace_record: bool = False, key_transformation: Optional[KeyTransformation] = None)
field_path: List[Union[airbyte_cdk.InterpolatedString, str]]
def
transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
76 def transform( 77 self, 78 record: Dict[str, Any], 79 config: Optional[Config] = None, 80 stream_state: Optional[StreamState] = None, 81 stream_slice: Optional[StreamSlice] = None, 82 ) -> None: 83 path = [path.eval(self.config) for path in self._field_path] 84 if "*" in path: 85 matched = dpath.values(record, path) 86 extracted = matched[0] if matched else None 87 else: 88 extracted = dpath.get(record, path, default=[]) 89 90 if isinstance(extracted, dict): 91 extracted = self._apply_key_transformation(extracted) 92 93 if self.replace_record and extracted: 94 dpath.delete(record, "**") 95 record.update(extracted) 96 else: 97 conflicts = set(extracted.keys()) & set(record.keys()) 98 if not conflicts: 99 if self.delete_origin_value: 100 dpath.delete(record, path) 101 record.update(extracted)
Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.
Parameters
- record: The input record to be transformed
- config: The user-provided configuration as specified by the source's spec
- stream_state: The stream state
- stream_slice: The stream slice
Returns
The transformed record