airbyte_cdk.sources.declarative.requesters.query_properties.strategies
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.group_by_key import ( 4 GroupByKey, 5) 6from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( 7 RecordMergeStrategy, 8) 9 10__all__ = ["GroupByKey", "RecordMergeStrategy"]
@dataclass
class
GroupByKey13@dataclass 14class GroupByKey(RecordMergeStrategy): 15 """ 16 Record merge strategy that combines records together according to values on the record for one or many keys. 17 """ 18 19 key: Union[str, List[str]] 20 parameters: InitVar[Mapping[str, Any]] 21 config: Config 22 23 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 24 self._keys = [self.key] if isinstance(self.key, str) else self.key 25 26 def get_group_key(self, record: Record) -> Optional[str]: 27 resolved_keys = [] 28 for key in self._keys: 29 key_value = record.data.get(key) 30 if key_value: 31 resolved_keys.append(key_value) 32 else: 33 return None 34 return ",".join(resolved_keys)
Record merge strategy that combines records together according to values on the record for one or many keys.
GroupByKey( key: Union[str, List[str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any])
@dataclass
class
RecordMergeStrategy11@dataclass 12class RecordMergeStrategy(ABC): 13 """ 14 Describe the interface for how records that required multiple requests to get the complete set of fields 15 should be merged back into a single record. 16 """ 17 18 @abstractmethod 19 def get_group_key(self, record: Record) -> Optional[str]: 20 pass
Describe the interface for how records that required multiple requests to get the complete set of fields should be merged back into a single record.