airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5import re 6from dataclasses import dataclass 7from typing import Any, Dict, List, Optional 8 9import anyascii 10 11from airbyte_cdk.sources.declarative.transformations import RecordTransformation 12from airbyte_cdk.sources.types import Config, StreamSlice, StreamState 13 14 15@dataclass 16class KeysToSnakeCaseTransformation(RecordTransformation): 17 token_pattern: re.Pattern[str] = re.compile( 18 r"[A-Z]+[a-z]*|[a-z]+|\d+|(?P<NoToken>[^a-zA-Z\d]+)" 19 ) 20 21 def transform( 22 self, 23 record: Dict[str, Any], 24 config: Optional[Config] = None, 25 stream_state: Optional[StreamState] = None, 26 stream_slice: Optional[StreamSlice] = None, 27 ) -> None: 28 transformed_record = self._transform_record(record) 29 record.clear() 30 record.update(transformed_record) 31 32 def _transform_record(self, record: Dict[str, Any]) -> Dict[str, Any]: 33 transformed_record = {} 34 for key, value in record.items(): 35 transformed_key = self.process_key(key) 36 transformed_value = value 37 38 if isinstance(value, dict): 39 transformed_value = self._transform_record(value) 40 41 transformed_record[transformed_key] = transformed_value 42 return transformed_record 43 44 def process_key(self, key: str) -> str: 45 key = self.normalize_key(key) 46 tokens = self.tokenize_key(key) 47 tokens = self.filter_tokens(tokens) 48 return self.tokens_to_snake_case(tokens) 49 50 def normalize_key(self, key: str) -> str: 51 return str(anyascii.anyascii(key)) 52 53 def tokenize_key(self, key: str) -> List[str]: 54 tokens = [] 55 for match in self.token_pattern.finditer(key): 56 token = match.group(0) if match.group("NoToken") is None else "" 57 tokens.append(token) 58 return tokens 59 60 def filter_tokens(self, tokens: List[str]) -> List[str]: 61 if len(tokens) >= 3: 62 tokens = tokens[:1] + [t for t in tokens[1:-1] if t] + tokens[-1:] 63 if tokens and tokens[0].isdigit(): 64 tokens.insert(0, "") 65 return tokens 66 67 def tokens_to_snake_case(self, tokens: List[str]) -> str: 68 return "_".join(token.lower() for token in tokens)
@dataclass
class
KeysToSnakeCaseTransformation16@dataclass 17class KeysToSnakeCaseTransformation(RecordTransformation): 18 token_pattern: re.Pattern[str] = re.compile( 19 r"[A-Z]+[a-z]*|[a-z]+|\d+|(?P<NoToken>[^a-zA-Z\d]+)" 20 ) 21 22 def transform( 23 self, 24 record: Dict[str, Any], 25 config: Optional[Config] = None, 26 stream_state: Optional[StreamState] = None, 27 stream_slice: Optional[StreamSlice] = None, 28 ) -> None: 29 transformed_record = self._transform_record(record) 30 record.clear() 31 record.update(transformed_record) 32 33 def _transform_record(self, record: Dict[str, Any]) -> Dict[str, Any]: 34 transformed_record = {} 35 for key, value in record.items(): 36 transformed_key = self.process_key(key) 37 transformed_value = value 38 39 if isinstance(value, dict): 40 transformed_value = self._transform_record(value) 41 42 transformed_record[transformed_key] = transformed_value 43 return transformed_record 44 45 def process_key(self, key: str) -> str: 46 key = self.normalize_key(key) 47 tokens = self.tokenize_key(key) 48 tokens = self.filter_tokens(tokens) 49 return self.tokens_to_snake_case(tokens) 50 51 def normalize_key(self, key: str) -> str: 52 return str(anyascii.anyascii(key)) 53 54 def tokenize_key(self, key: str) -> List[str]: 55 tokens = [] 56 for match in self.token_pattern.finditer(key): 57 token = match.group(0) if match.group("NoToken") is None else "" 58 tokens.append(token) 59 return tokens 60 61 def filter_tokens(self, tokens: List[str]) -> List[str]: 62 if len(tokens) >= 3: 63 tokens = tokens[:1] + [t for t in tokens[1:-1] if t] + tokens[-1:] 64 if tokens and tokens[0].isdigit(): 65 tokens.insert(0, "") 66 return tokens 67 68 def tokens_to_snake_case(self, tokens: List[str]) -> str: 69 return "_".join(token.lower() for token in tokens)
KeysToSnakeCaseTransformation( token_pattern: re.Pattern[str] = re.compile('[A-Z]+[a-z]*|[a-z]+|\\d+|(?P<NoToken>[^a-zA-Z\\d]+)'))
def
transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
22 def transform( 23 self, 24 record: Dict[str, Any], 25 config: Optional[Config] = None, 26 stream_state: Optional[StreamState] = None, 27 stream_slice: Optional[StreamSlice] = None, 28 ) -> None: 29 transformed_record = self._transform_record(record) 30 record.clear() 31 record.update(transformed_record)
Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.
Parameters
- record: The input record to be transformed
- config: The user-provided configuration as specified by the source's spec
- stream_state: The stream state
- stream_slice: The stream slice
Returns
The transformed record