airbyte_cdk.sources.declarative.transformations.keys_to_snake_transformation

 1#
 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 3#
 4
 5import re
 6from dataclasses import dataclass
 7from typing import Any, Dict, List, Optional
 8
 9import anyascii
10
11from airbyte_cdk.sources.declarative.transformations import RecordTransformation
12from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
13
14
15@dataclass
16class KeysToSnakeCaseTransformation(RecordTransformation):
17    token_pattern: re.Pattern[str] = re.compile(
18        r"[A-Z]+[a-z]*|[a-z]+|\d+|(?P<NoToken>[^a-zA-Z\d]+)"
19    )
20
21    def transform(
22        self,
23        record: Dict[str, Any],
24        config: Optional[Config] = None,
25        stream_state: Optional[StreamState] = None,
26        stream_slice: Optional[StreamSlice] = None,
27    ) -> None:
28        transformed_record = self._transform_record(record)
29        record.clear()
30        record.update(transformed_record)
31
32    def _transform_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
33        transformed_record = {}
34        for key, value in record.items():
35            transformed_key = self.process_key(key)
36            transformed_value = value
37
38            if isinstance(value, dict):
39                transformed_value = self._transform_record(value)
40
41            transformed_record[transformed_key] = transformed_value
42        return transformed_record
43
44    def process_key(self, key: str) -> str:
45        key = self.normalize_key(key)
46        tokens = self.tokenize_key(key)
47        tokens = self.filter_tokens(tokens)
48        return self.tokens_to_snake_case(tokens)
49
50    def normalize_key(self, key: str) -> str:
51        return str(anyascii.anyascii(key))
52
53    def tokenize_key(self, key: str) -> List[str]:
54        tokens = []
55        for match in self.token_pattern.finditer(key):
56            token = match.group(0) if match.group("NoToken") is None else ""
57            tokens.append(token)
58        return tokens
59
60    def filter_tokens(self, tokens: List[str]) -> List[str]:
61        if len(tokens) >= 3:
62            tokens = tokens[:1] + [t for t in tokens[1:-1] if t] + tokens[-1:]
63        if tokens and tokens[0].isdigit():
64            tokens.insert(0, "")
65        return tokens
66
67    def tokens_to_snake_case(self, tokens: List[str]) -> str:
68        return "_".join(token.lower() for token in tokens)
@dataclass
class KeysToSnakeCaseTransformation(airbyte_cdk.sources.declarative.transformations.transformation.RecordTransformation):
16@dataclass
17class KeysToSnakeCaseTransformation(RecordTransformation):
18    token_pattern: re.Pattern[str] = re.compile(
19        r"[A-Z]+[a-z]*|[a-z]+|\d+|(?P<NoToken>[^a-zA-Z\d]+)"
20    )
21
22    def transform(
23        self,
24        record: Dict[str, Any],
25        config: Optional[Config] = None,
26        stream_state: Optional[StreamState] = None,
27        stream_slice: Optional[StreamSlice] = None,
28    ) -> None:
29        transformed_record = self._transform_record(record)
30        record.clear()
31        record.update(transformed_record)
32
33    def _transform_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
34        transformed_record = {}
35        for key, value in record.items():
36            transformed_key = self.process_key(key)
37            transformed_value = value
38
39            if isinstance(value, dict):
40                transformed_value = self._transform_record(value)
41
42            transformed_record[transformed_key] = transformed_value
43        return transformed_record
44
45    def process_key(self, key: str) -> str:
46        key = self.normalize_key(key)
47        tokens = self.tokenize_key(key)
48        tokens = self.filter_tokens(tokens)
49        return self.tokens_to_snake_case(tokens)
50
51    def normalize_key(self, key: str) -> str:
52        return str(anyascii.anyascii(key))
53
54    def tokenize_key(self, key: str) -> List[str]:
55        tokens = []
56        for match in self.token_pattern.finditer(key):
57            token = match.group(0) if match.group("NoToken") is None else ""
58            tokens.append(token)
59        return tokens
60
61    def filter_tokens(self, tokens: List[str]) -> List[str]:
62        if len(tokens) >= 3:
63            tokens = tokens[:1] + [t for t in tokens[1:-1] if t] + tokens[-1:]
64        if tokens and tokens[0].isdigit():
65            tokens.insert(0, "")
66        return tokens
67
68    def tokens_to_snake_case(self, tokens: List[str]) -> str:
69        return "_".join(token.lower() for token in tokens)
KeysToSnakeCaseTransformation( token_pattern: re.Pattern[str] = re.compile('[A-Z]+[a-z]*|[a-z]+|\\d+|(?P<NoToken>[^a-zA-Z\\d]+)'))
token_pattern: re.Pattern[str] = re.compile('[A-Z]+[a-z]*|[a-z]+|\\d+|(?P<NoToken>[^a-zA-Z\\d]+)')
def transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
22    def transform(
23        self,
24        record: Dict[str, Any],
25        config: Optional[Config] = None,
26        stream_state: Optional[StreamState] = None,
27        stream_slice: Optional[StreamSlice] = None,
28    ) -> None:
29        transformed_record = self._transform_record(record)
30        record.clear()
31        record.update(transformed_record)

Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.

Parameters
  • record: The input record to be transformed
  • config: The user-provided configuration as specified by the source's spec
  • stream_state: The stream state
  • stream_slice: The stream slice
Returns

The transformed record

def process_key(self, key: str) -> str:
45    def process_key(self, key: str) -> str:
46        key = self.normalize_key(key)
47        tokens = self.tokenize_key(key)
48        tokens = self.filter_tokens(tokens)
49        return self.tokens_to_snake_case(tokens)
def normalize_key(self, key: str) -> str:
51    def normalize_key(self, key: str) -> str:
52        return str(anyascii.anyascii(key))
def tokenize_key(self, key: str) -> List[str]:
54    def tokenize_key(self, key: str) -> List[str]:
55        tokens = []
56        for match in self.token_pattern.finditer(key):
57            token = match.group(0) if match.group("NoToken") is None else ""
58            tokens.append(token)
59        return tokens
def filter_tokens(self, tokens: List[str]) -> List[str]:
61    def filter_tokens(self, tokens: List[str]) -> List[str]:
62        if len(tokens) >= 3:
63            tokens = tokens[:1] + [t for t in tokens[1:-1] if t] + tokens[-1:]
64        if tokens and tokens[0].isdigit():
65            tokens.insert(0, "")
66        return tokens
def tokens_to_snake_case(self, tokens: List[str]) -> str:
68    def tokens_to_snake_case(self, tokens: List[str]) -> str:
69        return "_".join(token.lower() for token in tokens)