airbyte_cdk.sources.declarative.transformations.flatten_fields
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5from dataclasses import dataclass 6from typing import Any, Dict, Optional 7 8from airbyte_cdk.sources.declarative.transformations import RecordTransformation 9from airbyte_cdk.sources.types import Config, StreamSlice, StreamState 10 11 12@dataclass 13class FlattenFields(RecordTransformation): 14 flatten_lists: bool = True 15 16 def transform( 17 self, 18 record: Dict[str, Any], 19 config: Optional[Config] = None, 20 stream_state: Optional[StreamState] = None, 21 stream_slice: Optional[StreamSlice] = None, 22 ) -> None: 23 transformed_record = self.flatten_record(record) 24 record.clear() 25 record.update(transformed_record) 26 27 def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]: 28 stack = [(record, "_")] 29 transformed_record: Dict[str, Any] = {} 30 force_with_parent_name = False 31 32 while stack: 33 current_record, parent_key = stack.pop() 34 35 if isinstance(current_record, dict): 36 for current_key, value in current_record.items(): 37 new_key = ( 38 f"{parent_key}.{current_key}" 39 if (current_key in transformed_record or force_with_parent_name) 40 else current_key 41 ) 42 stack.append((value, new_key)) 43 44 elif isinstance(current_record, list) and self.flatten_lists: 45 for i, item in enumerate(current_record): 46 force_with_parent_name = True 47 stack.append((item, f"{parent_key}.{i}")) 48 49 else: 50 transformed_record[parent_key] = current_record 51 52 return transformed_record
@dataclass
class
FlattenFields13@dataclass 14class FlattenFields(RecordTransformation): 15 flatten_lists: bool = True 16 17 def transform( 18 self, 19 record: Dict[str, Any], 20 config: Optional[Config] = None, 21 stream_state: Optional[StreamState] = None, 22 stream_slice: Optional[StreamSlice] = None, 23 ) -> None: 24 transformed_record = self.flatten_record(record) 25 record.clear() 26 record.update(transformed_record) 27 28 def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]: 29 stack = [(record, "_")] 30 transformed_record: Dict[str, Any] = {} 31 force_with_parent_name = False 32 33 while stack: 34 current_record, parent_key = stack.pop() 35 36 if isinstance(current_record, dict): 37 for current_key, value in current_record.items(): 38 new_key = ( 39 f"{parent_key}.{current_key}" 40 if (current_key in transformed_record or force_with_parent_name) 41 else current_key 42 ) 43 stack.append((value, new_key)) 44 45 elif isinstance(current_record, list) and self.flatten_lists: 46 for i, item in enumerate(current_record): 47 force_with_parent_name = True 48 stack.append((item, f"{parent_key}.{i}")) 49 50 else: 51 transformed_record[parent_key] = current_record 52 53 return transformed_record
def
transform( self, record: Dict[str, Any], config: Optional[Mapping[str, Any]] = None, stream_state: Optional[Mapping[str, Any]] = None, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> None:
17 def transform( 18 self, 19 record: Dict[str, Any], 20 config: Optional[Config] = None, 21 stream_state: Optional[StreamState] = None, 22 stream_slice: Optional[StreamSlice] = None, 23 ) -> None: 24 transformed_record = self.flatten_record(record) 25 record.clear() 26 record.update(transformed_record)
Transform a record by adding, deleting, or mutating fields directly from the record reference passed in argument.
Parameters
- record: The input record to be transformed
- config: The user-provided configuration as specified by the source's spec
- stream_state: The stream state
- stream_slice: The stream slice
Returns
The transformed record
def
flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
28 def flatten_record(self, record: Dict[str, Any]) -> Dict[str, Any]: 29 stack = [(record, "_")] 30 transformed_record: Dict[str, Any] = {} 31 force_with_parent_name = False 32 33 while stack: 34 current_record, parent_key = stack.pop() 35 36 if isinstance(current_record, dict): 37 for current_key, value in current_record.items(): 38 new_key = ( 39 f"{parent_key}.{current_key}" 40 if (current_key in transformed_record or force_with_parent_name) 41 else current_key 42 ) 43 stack.append((value, new_key)) 44 45 elif isinstance(current_record, list) and self.flatten_lists: 46 for i, item in enumerate(current_record): 47 force_with_parent_name = True 48 stack.append((item, f"{parent_key}.{i}")) 49 50 else: 51 transformed_record[parent_key] = current_record 52 53 return transformed_record