airbyte_cdk.sources.declarative.transformations.dpath_flatten_fields

  1from dataclasses import InitVar, dataclass
  2from typing import Any, Dict, List, Mapping, Optional, Union
  3
  4import dpath
  5
  6from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
  7from airbyte_cdk.sources.declarative.transformations import RecordTransformation
  8from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
  9
 10
 11@dataclass
 12class KeyTransformation:
 13    config: Config
 14    parameters: InitVar[Mapping[str, Any]]
 15    prefix: Optional[str] = None
 16    suffix: Optional[str] = None
 17
 18    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 19        if self.prefix is not None:
 20            self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval(
 21                self.config
 22            )
 23        if self.suffix is not None:
 24            self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval(
 25                self.config
 26            )
 27
 28
 29@dataclass
 30class DpathFlattenFields(RecordTransformation):
 31    """
 32    Flatten fields only for provided path.
 33
 34    field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
 35    delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
 36    replace_record: bool = False whether to replace origin record or not. Default is False.
 37    key_transformation: KeyTransformation = None how to transform extracted object keys
 38
 39    """
 40
 41    config: Config
 42    field_path: List[Union[InterpolatedString, str]]
 43    parameters: InitVar[Mapping[str, Any]]
 44    delete_origin_value: bool = False
 45    replace_record: bool = False
 46    key_transformation: Optional[KeyTransformation] = None
 47
 48    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 49        self._parameters = parameters
 50        self._field_path = [
 51            InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path
 52        ]
 53        for path_index in range(len(self.field_path)):
 54            if isinstance(self.field_path[path_index], str):
 55                self._field_path[path_index] = InterpolatedString.create(
 56                    self.field_path[path_index], parameters=self._parameters
 57                )
 58
 59    def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]:
 60        if self.key_transformation:
 61            if self.key_transformation.prefix:
 62                extracted = {
 63                    f"{self.key_transformation.prefix}{key}": value
 64                    for key, value in extracted.items()
 65                }
 66
 67            if self.key_transformation.suffix:
 68                extracted = {
 69                    f"{key}{self.key_transformation.suffix}": value
 70                    for key, value in extracted.items()
 71                }
 72
 73        return extracted
 74
 75    def transform(
 76        self,
 77        record: Dict[str, Any],
 78        config: Optional[Config] = None,
 79        stream_state: Optional[StreamState] = None,
 80        stream_slice: Optional[StreamSlice] = None,
 81    ) -> None:
 82        path = [path.eval(self.config) for path in self._field_path]
 83        if "*" in path:
 84            matched = dpath.values(record, path)
 85            extracted = matched[0] if matched else None
 86        else:
 87            extracted = dpath.get(record, path, default=[])
 88
 89        if isinstance(extracted, dict):
 90            extracted = self._apply_key_transformation(extracted)
 91
 92            if self.replace_record and extracted:
 93                dpath.delete(record, "**")
 94                record.update(extracted)
 95            else:
 96                conflicts = set(extracted.keys()) & set(record.keys())
 97                if not conflicts:
 98                    if self.delete_origin_value:
 99                        dpath.delete(record, path)
100                    record.update(extracted)
@dataclass
class KeyTransformation:
12@dataclass
13class KeyTransformation:
14    config: Config
15    parameters: InitVar[Mapping[str, Any]]
16    prefix: Optional[str] = None
17    suffix: Optional[str] = None
18
19    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
20        if self.prefix is not None:
21            self.prefix = InterpolatedString.create(self.prefix, parameters=parameters).eval(
22                self.config
23            )
24        if self.suffix is not None:
25            self.suffix = InterpolatedString.create(self.suffix, parameters=parameters).eval(
26                self.config
27            )
KeyTransformation( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], prefix: Optional[str] = None, suffix: Optional[str] = None)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
prefix: Optional[str] = None
suffix: Optional[str] = None
 30@dataclass
 31class DpathFlattenFields(RecordTransformation):
 32    """
 33    Flatten fields only for provided path.
 34
 35    field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
 36    delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
 37    replace_record: bool = False whether to replace origin record or not. Default is False.
 38    key_transformation: KeyTransformation = None how to transform extracted object keys
 39
 40    """
 41
 42    config: Config
 43    field_path: List[Union[InterpolatedString, str]]
 44    parameters: InitVar[Mapping[str, Any]]
 45    delete_origin_value: bool = False
 46    replace_record: bool = False
 47    key_transformation: Optional[KeyTransformation] = None
 48
 49    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 50        self._parameters = parameters
 51        self._field_path = [
 52            InterpolatedString.create(path, parameters=self._parameters) for path in self.field_path
 53        ]
 54        for path_index in range(len(self.field_path)):
 55            if isinstance(self.field_path[path_index], str):
 56                self._field_path[path_index] = InterpolatedString.create(
 57                    self.field_path[path_index], parameters=self._parameters
 58                )
 59
 60    def _apply_key_transformation(self, extracted: Mapping[str, Any]) -> Mapping[str, Any]:
 61        if self.key_transformation:
 62            if self.key_transformation.prefix:
 63                extracted = {
 64                    f"{self.key_transformation.prefix}{key}": value
 65                    for key, value in extracted.items()
 66                }
 67
 68            if self.key_transformation.suffix:
 69                extracted = {
 70                    f"{key}{self.key_transformation.suffix}": value
 71                    for key, value in extracted.items()
 72                }
 73
 74        return extracted
 75
 76    def transform(
 77        self,
 78        record: Dict[str, Any],
 79        config: Optional[Config] = None,
 80        stream_state: Optional[StreamState] = None,
 81        stream_slice: Optional[StreamSlice] = None,
 82    ) -> None:
 83        path = [path.eval(self.config) for path in self._field_path]
 84        if "*" in path:
 85            matched = dpath.values(record, path)
 86            extracted = matched[0] if matched else None
 87        else:
 88            extracted = dpath.get(record, path, default=[])
 89
 90        if isinstance(extracted, dict):
 91            extracted = self._apply_key_transformation(extracted)
 92
 93            if self.replace_record and extracted:
 94                dpath.delete(record, "**")
 95                record.update(extracted)
 96            else:
 97                conflicts = set(extracted.keys()) & set(record.keys())
 98                if not conflicts:
 99                    if self.delete_origin_value:
100                        dpath.delete(record, path)
101                    record.update(extracted)

Flatten fields only for provided path.

field_path: List[Union[InterpolatedString, str]] path to the field to flatten. delete_origin_value: bool = False whether to delete origin field or keep it. Default is False. replace_record: bool = False whether to replace origin record or not. Default is False. key_transformation: KeyTransformation = None how to transform extracted object keys

DpathFlattenFields( config: Mapping[str, Any], field_path: List[Union[airbyte_cdk.InterpolatedString, str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], delete_origin_value: bool = False, replace_record: bool = False, key_transformation: Optional[KeyTransformation] = None)
config: Mapping[str, Any]
field_path: List[Union[airbyte_cdk.InterpolatedString, str]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
delete_origin_value: bool = False
replace_record: bool = False
key_transformation: Optional[KeyTransformation] = None
def transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
 76    def transform(
 77        self,
 78        record: Dict[str, Any],
 79        config: Optional[Config] = None,
 80        stream_state: Optional[StreamState] = None,
 81        stream_slice: Optional[StreamSlice] = None,
 82    ) -> None:
 83        path = [path.eval(self.config) for path in self._field_path]
 84        if "*" in path:
 85            matched = dpath.values(record, path)
 86            extracted = matched[0] if matched else None
 87        else:
 88            extracted = dpath.get(record, path, default=[])
 89
 90        if isinstance(extracted, dict):
 91            extracted = self._apply_key_transformation(extracted)
 92
 93            if self.replace_record and extracted:
 94                dpath.delete(record, "**")
 95                record.update(extracted)
 96            else:
 97                conflicts = set(extracted.keys()) & set(record.keys())
 98                if not conflicts:
 99                    if self.delete_origin_value:
100                        dpath.delete(record, path)
101                    record.update(extracted)

Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.

Parameters
  • record: The input record to be transformed
  • config: The user-provided configuration as specified by the source's spec
  • stream_state: The stream state
  • stream_slice: The stream slice
Returns

The transformed record