airbyte_cdk.sources.declarative.transformations.keys_replace_transformation

 1#
 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 3#
 4
 5from dataclasses import InitVar, dataclass
 6from typing import Any, Dict, Mapping, Optional
 7
 8from airbyte_cdk import InterpolatedString
 9from airbyte_cdk.sources.declarative.transformations import RecordTransformation
10from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
11
12
13@dataclass
14class KeysReplaceTransformation(RecordTransformation):
15    """
16    Transformation that applies keys names replacement.
17
18    Example usage:
19    - type: KeysReplace
20      old: " "
21      new: "_"
22    Result:
23    from: {"created time": ..., "customer id": ..., "user id": ...}
24    to: {"created_time": ..., "customer_id": ..., "user_id": ...}
25    """
26
27    old: str
28    new: str
29    parameters: InitVar[Mapping[str, Any]]
30
31    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
32        self._old = InterpolatedString.create(self.old, parameters=parameters)
33        self._new = InterpolatedString.create(self.new, parameters=parameters)
34
35    def transform(
36        self,
37        record: Dict[str, Any],
38        config: Optional[Config] = None,
39        stream_state: Optional[StreamState] = None,
40        stream_slice: Optional[StreamSlice] = None,
41    ) -> None:
42        if config is None:
43            config = {}
44
45        kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
46        old_key = str(self._old.eval(config, **kwargs))
47        new_key = str(self._new.eval(config, **kwargs))
48
49        def _transform(data: Dict[str, Any]) -> Dict[str, Any]:
50            result = {}
51            for key, value in data.items():
52                updated_key = key.replace(old_key, new_key)
53                if isinstance(value, dict):
54                    result[updated_key] = _transform(value)
55                else:
56                    result[updated_key] = value
57            return result
58
59        transformed_record = _transform(record)
60        record.clear()
61        record.update(transformed_record)
@dataclass
class KeysReplaceTransformation(airbyte_cdk.sources.declarative.transformations.transformation.RecordTransformation):
14@dataclass
15class KeysReplaceTransformation(RecordTransformation):
16    """
17    Transformation that applies keys names replacement.
18
19    Example usage:
20    - type: KeysReplace
21      old: " "
22      new: "_"
23    Result:
24    from: {"created time": ..., "customer id": ..., "user id": ...}
25    to: {"created_time": ..., "customer_id": ..., "user_id": ...}
26    """
27
28    old: str
29    new: str
30    parameters: InitVar[Mapping[str, Any]]
31
32    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
33        self._old = InterpolatedString.create(self.old, parameters=parameters)
34        self._new = InterpolatedString.create(self.new, parameters=parameters)
35
36    def transform(
37        self,
38        record: Dict[str, Any],
39        config: Optional[Config] = None,
40        stream_state: Optional[StreamState] = None,
41        stream_slice: Optional[StreamSlice] = None,
42    ) -> None:
43        if config is None:
44            config = {}
45
46        kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
47        old_key = str(self._old.eval(config, **kwargs))
48        new_key = str(self._new.eval(config, **kwargs))
49
50        def _transform(data: Dict[str, Any]) -> Dict[str, Any]:
51            result = {}
52            for key, value in data.items():
53                updated_key = key.replace(old_key, new_key)
54                if isinstance(value, dict):
55                    result[updated_key] = _transform(value)
56                else:
57                    result[updated_key] = value
58            return result
59
60        transformed_record = _transform(record)
61        record.clear()
62        record.update(transformed_record)

Transformation that applies keys names replacement.

Example usage:

  • type: KeysReplace old: " " new: "_" Result: from: {"created time": ..., "customer id": ..., "user id": ...} to: {"created_time": ..., "customer_id": ..., "user_id": ...}
KeysReplaceTransformation( old: str, new: str, parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
old: str
new: str
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
36    def transform(
37        self,
38        record: Dict[str, Any],
39        config: Optional[Config] = None,
40        stream_state: Optional[StreamState] = None,
41        stream_slice: Optional[StreamSlice] = None,
42    ) -> None:
43        if config is None:
44            config = {}
45
46        kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
47        old_key = str(self._old.eval(config, **kwargs))
48        new_key = str(self._new.eval(config, **kwargs))
49
50        def _transform(data: Dict[str, Any]) -> Dict[str, Any]:
51            result = {}
52            for key, value in data.items():
53                updated_key = key.replace(old_key, new_key)
54                if isinstance(value, dict):
55                    result[updated_key] = _transform(value)
56                else:
57                    result[updated_key] = value
58            return result
59
60        transformed_record = _transform(record)
61        record.clear()
62        record.update(transformed_record)

Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.

Parameters
  • record: The input record to be transformed
  • config: The user-provided configuration as specified by the source's spec
  • stream_state: The stream state
  • stream_slice: The stream slice
Returns

The transformed record