airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields

 1from dataclasses import InitVar, dataclass
 2from typing import Any, Dict, List, Mapping, Optional, Union
 3
 4import dpath
 5
 6from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
 7from airbyte_cdk.sources.declarative.transformations import RecordTransformation
 8from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
 9
10
11@dataclass
12class DpathFlattenFields(RecordTransformation):
13    """
14    Flatten fields only for provided path.
15
16    field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
17    delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
18    replace_record: bool = False whether to replace origin record or not. Default is False.
19
20    """
21
22    config: Config
23    field_path: List[Union[InterpolatedString, str]]
24    parameters: InitVar[Mapping[str, Any]]
25    delete_origin_value: bool = False
26    replace_record: bool = False
27
28    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
29        self._field_path = [
30            InterpolatedString.create(path, parameters=parameters) for path in self.field_path
31        ]
32        for path_index in range(len(self.field_path)):
33            if isinstance(self.field_path[path_index], str):
34                self._field_path[path_index] = InterpolatedString.create(
35                    self.field_path[path_index], parameters=parameters
36                )
37
38    def transform(
39        self,
40        record: Dict[str, Any],
41        config: Optional[Config] = None,
42        stream_state: Optional[StreamState] = None,
43        stream_slice: Optional[StreamSlice] = None,
44    ) -> None:
45        path = [path.eval(self.config) for path in self._field_path]
46        if "*" in path:
47            matched = dpath.values(record, path)
48            extracted = matched[0] if matched else None
49        else:
50            extracted = dpath.get(record, path, default=[])
51
52        if isinstance(extracted, dict):
53            if self.replace_record and extracted:
54                dpath.delete(record, "**")
55                record.update(extracted)
56            else:
57                conflicts = set(extracted.keys()) & set(record.keys())
58                if not conflicts:
59                    if self.delete_origin_value:
60                        dpath.delete(record, path)
61                    record.update(extracted)
12@dataclass
13class DpathFlattenFields(RecordTransformation):
14    """
15    Flatten fields only for provided path.
16
17    field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
18    delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
19    replace_record: bool = False whether to replace origin record or not. Default is False.
20
21    """
22
23    config: Config
24    field_path: List[Union[InterpolatedString, str]]
25    parameters: InitVar[Mapping[str, Any]]
26    delete_origin_value: bool = False
27    replace_record: bool = False
28
29    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
30        self._field_path = [
31            InterpolatedString.create(path, parameters=parameters) for path in self.field_path
32        ]
33        for path_index in range(len(self.field_path)):
34            if isinstance(self.field_path[path_index], str):
35                self._field_path[path_index] = InterpolatedString.create(
36                    self.field_path[path_index], parameters=parameters
37                )
38
39    def transform(
40        self,
41        record: Dict[str, Any],
42        config: Optional[Config] = None,
43        stream_state: Optional[StreamState] = None,
44        stream_slice: Optional[StreamSlice] = None,
45    ) -> None:
46        path = [path.eval(self.config) for path in self._field_path]
47        if "*" in path:
48            matched = dpath.values(record, path)
49            extracted = matched[0] if matched else None
50        else:
51            extracted = dpath.get(record, path, default=[])
52
53        if isinstance(extracted, dict):
54            if self.replace_record and extracted:
55                dpath.delete(record, "**")
56                record.update(extracted)
57            else:
58                conflicts = set(extracted.keys()) & set(record.keys())
59                if not conflicts:
60                    if self.delete_origin_value:
61                        dpath.delete(record, path)
62                    record.update(extracted)

Flatten fields only for provided path.

field_path: List[Union[InterpolatedString, str]] path to the field to flatten. delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. replace_record: bool = False whether to replace origin record or not. Default is False.

DpathFlattenFields( config: Mapping[str, Any], field_path: List[Union[airbyte_cdk.InterpolatedString, str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], delete_origin_value: bool = False, replace_record: bool = False)
config: Mapping[str, Any]
field_path: List[Union[airbyte_cdk.InterpolatedString, str]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
delete_origin_value: bool = False
replace_record: bool = False
def transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
39    def transform(
40        self,
41        record: Dict[str, Any],
42        config: Optional[Config] = None,
43        stream_state: Optional[StreamState] = None,
44        stream_slice: Optional[StreamSlice] = None,
45    ) -> None:
46        path = [path.eval(self.config) for path in self._field_path]
47        if "*" in path:
48            matched = dpath.values(record, path)
49            extracted = matched[0] if matched else None
50        else:
51            extracted = dpath.get(record, path, default=[])
52
53        if isinstance(extracted, dict):
54            if self.replace_record and extracted:
55                dpath.delete(record, "**")
56                record.update(extracted)
57            else:
58                conflicts = set(extracted.keys()) & set(record.keys())
59                if not conflicts:
60                    if self.delete_origin_value:
61                        dpath.delete(record, path)
62                    record.update(extracted)

Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.

Parameters
  • record: The input record to be transformed
  • config: The user-provided configuration as specified by the source's spec
  • stream_state: The stream state
  • stream_slice: The stream slice
Returns

The transformed record