airbyte_cdk.sources.declarative.requesters.query_properties.strategies

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2
 3from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.group_by_key import (
 4    GroupByKey,
 5)
 6from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
 7    RecordMergeStrategy,
 8)
 9
10__all__ = ["GroupByKey", "RecordMergeStrategy"]
13@dataclass
14class GroupByKey(RecordMergeStrategy):
15    """
16    Record merge strategy that combines records together according to values on the record for one or many keys.
17    """
18
19    key: Union[str, List[str]]
20    parameters: InitVar[Mapping[str, Any]]
21    config: Config
22
23    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
24        self._keys = [self.key] if isinstance(self.key, str) else self.key
25
26    def get_group_key(self, record: Record) -> Optional[str]:
27        resolved_keys = []
28        for key in self._keys:
29            key_value = record.data.get(key)
30            if key_value:
31                resolved_keys.append(key_value)
32            else:
33                return None
34        return ",".join(resolved_keys)

Record merge strategy that combines records together according to values on the record for one or many keys.

GroupByKey( key: Union[str, List[str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any])
key: Union[str, List[str]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
def get_group_key(self, record: airbyte_cdk.Record) -> Optional[str]:
26    def get_group_key(self, record: Record) -> Optional[str]:
27        resolved_keys = []
28        for key in self._keys:
29            key_value = record.data.get(key)
30            if key_value:
31                resolved_keys.append(key_value)
32            else:
33                return None
34        return ",".join(resolved_keys)
@dataclass
class RecordMergeStrategy(abc.ABC):
11@dataclass
12class RecordMergeStrategy(ABC):
13    """
14    Describe the interface for how records that required multiple requests to get the complete set of fields
15    should be merged back into a single record.
16    """
17
18    @abstractmethod
19    def get_group_key(self, record: Record) -> Optional[str]:
20        pass

Describe the interface for how records that required multiple requests to get the complete set of fields should be merged back into a single record.

@abstractmethod
def get_group_key(self, record: airbyte_cdk.Record) -> Optional[str]:
18    @abstractmethod
19    def get_group_key(self, record: Record) -> Optional[str]:
20        pass