airbyte_cdk.sources.declarative.transformations.keys_replace_transformation
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5from dataclasses import InitVar, dataclass 6from typing import Any, Dict, Mapping, Optional 7 8from airbyte_cdk import InterpolatedString 9from airbyte_cdk.sources.declarative.transformations import RecordTransformation 10from airbyte_cdk.sources.types import Config, StreamSlice, StreamState 11 12 13@dataclass 14class KeysReplaceTransformation(RecordTransformation): 15 """ 16 Transformation that applies keys names replacement. 17 18 Example usage: 19 - type: KeysReplace 20 old: " " 21 new: "_" 22 Result: 23 from: {"created time": ..., "customer id": ..., "user id": ...} 24 to: {"created_time": ..., "customer_id": ..., "user_id": ...} 25 """ 26 27 old: str 28 new: str 29 parameters: InitVar[Mapping[str, Any]] 30 31 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 32 self._old = InterpolatedString.create(self.old, parameters=parameters) 33 self._new = InterpolatedString.create(self.new, parameters=parameters) 34 35 def transform( 36 self, 37 record: Dict[str, Any], 38 config: Optional[Config] = None, 39 stream_state: Optional[StreamState] = None, 40 stream_slice: Optional[StreamSlice] = None, 41 ) -> None: 42 if config is None: 43 config = {} 44 45 kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice} 46 old_key = str(self._old.eval(config, **kwargs)) 47 new_key = str(self._new.eval(config, **kwargs)) 48 49 def _transform(data: Dict[str, Any]) -> Dict[str, Any]: 50 result = {} 51 for key, value in data.items(): 52 updated_key = key.replace(old_key, new_key) 53 if isinstance(value, dict): 54 result[updated_key] = _transform(value) 55 else: 56 result[updated_key] = value 57 return result 58 59 transformed_record = _transform(record) 60 record.clear() 61 record.update(transformed_record)
@dataclass
class
KeysReplaceTransformation14@dataclass 15class KeysReplaceTransformation(RecordTransformation): 16 """ 17 Transformation that applies keys names replacement. 18 19 Example usage: 20 - type: KeysReplace 21 old: " " 22 new: "_" 23 Result: 24 from: {"created time": ..., "customer id": ..., "user id": ...} 25 to: {"created_time": ..., "customer_id": ..., "user_id": ...} 26 """ 27 28 old: str 29 new: str 30 parameters: InitVar[Mapping[str, Any]] 31 32 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 33 self._old = InterpolatedString.create(self.old, parameters=parameters) 34 self._new = InterpolatedString.create(self.new, parameters=parameters) 35 36 def transform( 37 self, 38 record: Dict[str, Any], 39 config: Optional[Config] = None, 40 stream_state: Optional[StreamState] = None, 41 stream_slice: Optional[StreamSlice] = None, 42 ) -> None: 43 if config is None: 44 config = {} 45 46 kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice} 47 old_key = str(self._old.eval(config, **kwargs)) 48 new_key = str(self._new.eval(config, **kwargs)) 49 50 def _transform(data: Dict[str, Any]) -> Dict[str, Any]: 51 result = {} 52 for key, value in data.items(): 53 updated_key = key.replace(old_key, new_key) 54 if isinstance(value, dict): 55 result[updated_key] = _transform(value) 56 else: 57 result[updated_key] = value 58 return result 59 60 transformed_record = _transform(record) 61 record.clear() 62 record.update(transformed_record)
Transformation that applies keys names replacement.
Example usage:
- type: KeysReplace old: " " new: "_" Result: from: {"created time": ..., "customer id": ..., "user id": ...} to: {"created_time": ..., "customer_id": ..., "user_id": ...}
KeysReplaceTransformation( old: str, new: str, parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
def
transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
36 def transform( 37 self, 38 record: Dict[str, Any], 39 config: Optional[Config] = None, 40 stream_state: Optional[StreamState] = None, 41 stream_slice: Optional[StreamSlice] = None, 42 ) -> None: 43 if config is None: 44 config = {} 45 46 kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice} 47 old_key = str(self._old.eval(config, **kwargs)) 48 new_key = str(self._new.eval(config, **kwargs)) 49 50 def _transform(data: Dict[str, Any]) -> Dict[str, Any]: 51 result = {} 52 for key, value in data.items(): 53 updated_key = key.replace(old_key, new_key) 54 if isinstance(value, dict): 55 result[updated_key] = _transform(value) 56 else: 57 result[updated_key] = value 58 return result 59 60 transformed_record = _transform(record) 61 record.clear() 62 record.update(transformed_record)
Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.
Parameters
- record: The input record to be transformed
- config: The user-provided configuration as specified by the source's spec
- stream_state: The stream state
- stream_slice: The stream slice
Returns
The transformed record