airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2
 3from dataclasses import InitVar, dataclass
 4from enum import Enum
 5from typing import Any, Iterable, List, Mapping, Optional, Set
 6
 7from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey
 8from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
 9    RecordMergeStrategy,
10)
11from airbyte_cdk.sources.types import Config, Record
12
13
14class PropertyLimitType(Enum):
15    """
16    The heuristic that determines when the maximum size of the current chunk of properties and when a new
17    one should be started.
18    """
19
20    characters = "characters"
21    property_count = "property_count"
22
23
24@dataclass
25class PropertyChunking:
26    """
27    Defines the behavior for how the complete list of properties to query for are broken down into smaller groups
28    that will be used for multiple requests to the target API.
29    """
30
31    property_limit_type: PropertyLimitType
32    property_limit: Optional[int]
33    record_merge_strategy: Optional[RecordMergeStrategy]
34    parameters: InitVar[Mapping[str, Any]]
35    config: Config
36
37    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
38        self._record_merge_strategy = self.record_merge_strategy or GroupByKey(
39            key="id", config=self.config, parameters=parameters
40        )
41
42    def get_request_property_chunks(
43        self,
44        property_fields: List[str],
45        always_include_properties: Optional[List[str]],
46        configured_properties: Optional[Set[str]],
47    ) -> Iterable[List[str]]:
48        if not self.property_limit:
49            single_property_chunk = list(property_fields)
50            if always_include_properties:
51                single_property_chunk.extend(always_include_properties)
52            yield single_property_chunk
53            return
54        current_chunk = list(always_include_properties) if always_include_properties else []
55        chunk_size = 0
56        for property_field in property_fields:
57            # If property_limit_type is not defined, we default to property_count which is just an incrementing count
58            # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
59            if configured_properties is not None and property_field not in configured_properties:
60                continue
61            property_field_size = (
62                len(property_field)
63                + 3  # The +3 represents the extra characters for encoding the delimiter in between properties
64                if self.property_limit_type == PropertyLimitType.characters
65                else 1
66            )
67            if chunk_size + property_field_size > self.property_limit:
68                yield current_chunk
69                current_chunk = list(always_include_properties) if always_include_properties else []
70                chunk_size = 0
71            current_chunk.append(property_field)
72            chunk_size += property_field_size
73        yield current_chunk
74
75    def get_merge_key(self, record: Record) -> Optional[str]:
76        return self._record_merge_strategy.get_group_key(record=record)
class PropertyLimitType(enum.Enum):
15class PropertyLimitType(Enum):
16    """
17    The heuristic that determines when the maximum size of the current chunk of properties and when a new
18    one should be started.
19    """
20
21    characters = "characters"
22    property_count = "property_count"

The heuristic that determines when the maximum size of the current chunk of properties and when a new one should be started.

characters = <PropertyLimitType.characters: 'characters'>
property_count = <PropertyLimitType.property_count: 'property_count'>
@dataclass
class PropertyChunking:
25@dataclass
26class PropertyChunking:
27    """
28    Defines the behavior for how the complete list of properties to query for are broken down into smaller groups
29    that will be used for multiple requests to the target API.
30    """
31
32    property_limit_type: PropertyLimitType
33    property_limit: Optional[int]
34    record_merge_strategy: Optional[RecordMergeStrategy]
35    parameters: InitVar[Mapping[str, Any]]
36    config: Config
37
38    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
39        self._record_merge_strategy = self.record_merge_strategy or GroupByKey(
40            key="id", config=self.config, parameters=parameters
41        )
42
43    def get_request_property_chunks(
44        self,
45        property_fields: List[str],
46        always_include_properties: Optional[List[str]],
47        configured_properties: Optional[Set[str]],
48    ) -> Iterable[List[str]]:
49        if not self.property_limit:
50            single_property_chunk = list(property_fields)
51            if always_include_properties:
52                single_property_chunk.extend(always_include_properties)
53            yield single_property_chunk
54            return
55        current_chunk = list(always_include_properties) if always_include_properties else []
56        chunk_size = 0
57        for property_field in property_fields:
58            # If property_limit_type is not defined, we default to property_count which is just an incrementing count
59            # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
60            if configured_properties is not None and property_field not in configured_properties:
61                continue
62            property_field_size = (
63                len(property_field)
64                + 3  # The +3 represents the extra characters for encoding the delimiter in between properties
65                if self.property_limit_type == PropertyLimitType.characters
66                else 1
67            )
68            if chunk_size + property_field_size > self.property_limit:
69                yield current_chunk
70                current_chunk = list(always_include_properties) if always_include_properties else []
71                chunk_size = 0
72            current_chunk.append(property_field)
73            chunk_size += property_field_size
74        yield current_chunk
75
76    def get_merge_key(self, record: Record) -> Optional[str]:
77        return self._record_merge_strategy.get_group_key(record=record)

Defines the behavior for how the complete list of properties to query for are broken down into smaller groups that will be used for multiple requests to the target API.

PropertyChunking( property_limit_type: PropertyLimitType, property_limit: Optional[int], record_merge_strategy: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.strategies.RecordMergeStrategy], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any])
property_limit_type: PropertyLimitType
property_limit: Optional[int]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
def get_request_property_chunks( self, property_fields: List[str], always_include_properties: Optional[List[str]], configured_properties: Optional[Set[str]]) -> Iterable[List[str]]:
43    def get_request_property_chunks(
44        self,
45        property_fields: List[str],
46        always_include_properties: Optional[List[str]],
47        configured_properties: Optional[Set[str]],
48    ) -> Iterable[List[str]]:
49        if not self.property_limit:
50            single_property_chunk = list(property_fields)
51            if always_include_properties:
52                single_property_chunk.extend(always_include_properties)
53            yield single_property_chunk
54            return
55        current_chunk = list(always_include_properties) if always_include_properties else []
56        chunk_size = 0
57        for property_field in property_fields:
58            # If property_limit_type is not defined, we default to property_count which is just an incrementing count
59            # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
60            if configured_properties is not None and property_field not in configured_properties:
61                continue
62            property_field_size = (
63                len(property_field)
64                + 3  # The +3 represents the extra characters for encoding the delimiter in between properties
65                if self.property_limit_type == PropertyLimitType.characters
66                else 1
67            )
68            if chunk_size + property_field_size > self.property_limit:
69                yield current_chunk
70                current_chunk = list(always_include_properties) if always_include_properties else []
71                chunk_size = 0
72            current_chunk.append(property_field)
73            chunk_size += property_field_size
74        yield current_chunk
def get_merge_key(self, record: airbyte_cdk.Record) -> Optional[str]:
76    def get_merge_key(self, record: Record) -> Optional[str]:
77        return self._record_merge_strategy.get_group_key(record=record)