airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields
1from dataclasses import InitVar, dataclass 2from typing import Any, Dict, List, Mapping, Optional, Union 3 4import dpath 5 6from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString 7from airbyte_cdk.sources.declarative.transformations import RecordTransformation 8from airbyte_cdk.sources.types import Config, StreamSlice, StreamState 9 10 11@dataclass 12class DpathFlattenFields(RecordTransformation): 13 """ 14 Flatten fields only for provided path. 15 16 field_path: List[Union[InterpolatedString, str]] path to the field to flatten. 17 delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. 18 replace_record: bool = False whether to replace origin record or not. Default is False. 19 20 """ 21 22 config: Config 23 field_path: List[Union[InterpolatedString, str]] 24 parameters: InitVar[Mapping[str, Any]] 25 delete_origin_value: bool = False 26 replace_record: bool = False 27 28 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 29 self._field_path = [ 30 InterpolatedString.create(path, parameters=parameters) for path in self.field_path 31 ] 32 for path_index in range(len(self.field_path)): 33 if isinstance(self.field_path[path_index], str): 34 self._field_path[path_index] = InterpolatedString.create( 35 self.field_path[path_index], parameters=parameters 36 ) 37 38 def transform( 39 self, 40 record: Dict[str, Any], 41 config: Optional[Config] = None, 42 stream_state: Optional[StreamState] = None, 43 stream_slice: Optional[StreamSlice] = None, 44 ) -> None: 45 path = [path.eval(self.config) for path in self._field_path] 46 if "*" in path: 47 matched = dpath.values(record, path) 48 extracted = matched[0] if matched else None 49 else: 50 extracted = dpath.get(record, path, default=[]) 51 52 if isinstance(extracted, dict): 53 if self.replace_record and extracted: 54 dpath.delete(record, "**") 55 record.update(extracted) 56 else: 57 conflicts = set(extracted.keys()) & set(record.keys()) 58 if not conflicts: 59 if self.delete_origin_value: 60 dpath.delete(record, path) 61 record.update(extracted)
@dataclass
class
DpathFlattenFields12@dataclass 13class DpathFlattenFields(RecordTransformation): 14 """ 15 Flatten fields only for provided path. 16 17 field_path: List[Union[InterpolatedString, str]] path to the field to flatten. 18 delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. 19 replace_record: bool = False whether to replace origin record or not. Default is False. 20 21 """ 22 23 config: Config 24 field_path: List[Union[InterpolatedString, str]] 25 parameters: InitVar[Mapping[str, Any]] 26 delete_origin_value: bool = False 27 replace_record: bool = False 28 29 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 30 self._field_path = [ 31 InterpolatedString.create(path, parameters=parameters) for path in self.field_path 32 ] 33 for path_index in range(len(self.field_path)): 34 if isinstance(self.field_path[path_index], str): 35 self._field_path[path_index] = InterpolatedString.create( 36 self.field_path[path_index], parameters=parameters 37 ) 38 39 def transform( 40 self, 41 record: Dict[str, Any], 42 config: Optional[Config] = None, 43 stream_state: Optional[StreamState] = None, 44 stream_slice: Optional[StreamSlice] = None, 45 ) -> None: 46 path = [path.eval(self.config) for path in self._field_path] 47 if "*" in path: 48 matched = dpath.values(record, path) 49 extracted = matched[0] if matched else None 50 else: 51 extracted = dpath.get(record, path, default=[]) 52 53 if isinstance(extracted, dict): 54 if self.replace_record and extracted: 55 dpath.delete(record, "**") 56 record.update(extracted) 57 else: 58 conflicts = set(extracted.keys()) & set(record.keys()) 59 if not conflicts: 60 if self.delete_origin_value: 61 dpath.delete(record, path) 62 record.update(extracted)
Flatten fields only for provided path.
field_path: List[Union[InterpolatedString, str]] path to the field to flatten. delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. replace_record: bool = False whether to replace origin record or not. Default is False.
DpathFlattenFields( config: Mapping[str, Any], field_path: List[Union[airbyte_cdk.InterpolatedString, str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], delete_origin_value: bool = False, replace_record: bool = False)
field_path: List[Union[airbyte_cdk.InterpolatedString, str]]
def
transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
39 def transform( 40 self, 41 record: Dict[str, Any], 42 config: Optional[Config] = None, 43 stream_state: Optional[StreamState] = None, 44 stream_slice: Optional[StreamSlice] = None, 45 ) -> None: 46 path = [path.eval(self.config) for path in self._field_path] 47 if "*" in path: 48 matched = dpath.values(record, path) 49 extracted = matched[0] if matched else None 50 else: 51 extracted = dpath.get(record, path, default=[]) 52 53 if isinstance(extracted, dict): 54 if self.replace_record and extracted: 55 dpath.delete(record, "**") 56 record.update(extracted) 57 else: 58 conflicts = set(extracted.keys()) & set(record.keys()) 59 if not conflicts: 60 if self.delete_origin_value: 61 dpath.delete(record, path) 62 record.update(extracted)
Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.
Parameters
- record: The input record to be transformed
- config: The user-provided configuration as specified by the source's spec
- stream_state: The stream state
- stream_slice: The stream slice
Returns
The transformed record