airbyte_cdk.sources.declarative.transformations.flatten_fields

 1#
 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 3#
 4
 5from dataclasses import dataclass
 6from typing import Any, Dict, Optional
 7
 8from airbyte_cdk.sources.declarative.transformations import RecordTransformation
 9from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
10
11
12@dataclass
13class FlattenFields(RecordTransformation):
14    flatten_lists: bool = True
15
16    def transform(
17        self,
18        record: Dict[str, Any],
19        config: Optional[Config] = None,
20        stream_state: Optional[StreamState] = None,
21        stream_slice: Optional[StreamSlice] = None,
22    ) -> None:
23        transformed_record = self.flatten_record(record)
24        record.clear()
25        record.update(transformed_record)
26
27    def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
28        stack = [(record, "_")]
29        transformed_record: Dict[str, Any] = {}
30        force_with_parent_name = False
31
32        while stack:
33            current_record, parent_key = stack.pop()
34
35            if isinstance(current_record, dict):
36                for current_key, value in current_record.items():
37                    new_key = (
38                        f"{parent_key}.{current_key}"
39                        if (current_key in transformed_record or force_with_parent_name)
40                        else current_key
41                    )
42                    stack.append((value, new_key))
43
44            elif isinstance(current_record, list) and self.flatten_lists:
45                for i, item in enumerate(current_record):
46                    force_with_parent_name = True
47                    stack.append((item, f"{parent_key}.{i}"))
48
49            else:
50                transformed_record[parent_key] = current_record
51
52        return transformed_record
13@dataclass
14class FlattenFields(RecordTransformation):
15    flatten_lists: bool = True
16
17    def transform(
18        self,
19        record: Dict[str, Any],
20        config: Optional[Config] = None,
21        stream_state: Optional[StreamState] = None,
22        stream_slice: Optional[StreamSlice] = None,
23    ) -> None:
24        transformed_record = self.flatten_record(record)
25        record.clear()
26        record.update(transformed_record)
27
28    def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
29        stack = [(record, "_")]
30        transformed_record: Dict[str, Any] = {}
31        force_with_parent_name = False
32
33        while stack:
34            current_record, parent_key = stack.pop()
35
36            if isinstance(current_record, dict):
37                for current_key, value in current_record.items():
38                    new_key = (
39                        f"{parent_key}.{current_key}"
40                        if (current_key in transformed_record or force_with_parent_name)
41                        else current_key
42                    )
43                    stack.append((value, new_key))
44
45            elif isinstance(current_record, list) and self.flatten_lists:
46                for i, item in enumerate(current_record):
47                    force_with_parent_name = True
48                    stack.append((item, f"{parent_key}.{i}"))
49
50            else:
51                transformed_record[parent_key] = current_record
52
53        return transformed_record
FlattenFields(flatten_lists: bool = True)
flatten_lists: bool = True
def transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
17    def transform(
18        self,
19        record: Dict[str, Any],
20        config: Optional[Config] = None,
21        stream_state: Optional[StreamState] = None,
22        stream_slice: Optional[StreamSlice] = None,
23    ) -> None:
24        transformed_record = self.flatten_record(record)
25        record.clear()
26        record.update(transformed_record)

Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.

Parameters
  • record: The input record to be transformed
  • config: The user-provided configuration as specified by the source's spec
  • stream_state: The stream state
  • stream_slice: The stream slice
Returns

The transformed record

def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
28    def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
29        stack = [(record, "_")]
30        transformed_record: Dict[str, Any] = {}
31        force_with_parent_name = False
32
33        while stack:
34            current_record, parent_key = stack.pop()
35
36            if isinstance(current_record, dict):
37                for current_key, value in current_record.items():
38                    new_key = (
39                        f"{parent_key}.{current_key}"
40                        if (current_key in transformed_record or force_with_parent_name)
41                        else current_key
42                    )
43                    stack.append((value, new_key))
44
45            elif isinstance(current_record, list) and self.flatten_lists:
46                for i, item in enumerate(current_record):
47                    force_with_parent_name = True
48                    stack.append((item, f"{parent_key}.{i}"))
49
50            else:
51                transformed_record[parent_key] = current_record
52
53        return transformed_record