airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2
 3from dataclasses import InitVar, dataclass
 4from enum import Enum
 5from typing import Any, Iterable, List, Mapping, Optional
 6
 7from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey
 8from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
 9    RecordMergeStrategy,
10)
11from airbyte_cdk.sources.types import Config, Record
12
13
14class PropertyLimitType(Enum):
15    """
16    The heuristic that determines when the maximum size of the current chunk of properties and when a new
17    one should be started.
18    """
19
20    characters = "characters"
21    property_count = "property_count"
22
23
24@dataclass
25class PropertyChunking:
26    """
27    Defines the behavior for how the complete list of properties to query for are broken down into smaller groups
28    that will be used for multiple requests to the target API.
29    """
30
31    property_limit_type: PropertyLimitType
32    property_limit: Optional[int]
33    record_merge_strategy: Optional[RecordMergeStrategy]
34    parameters: InitVar[Mapping[str, Any]]
35    config: Config
36
37    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
38        self._record_merge_strategy = self.record_merge_strategy or GroupByKey(
39            key="id", config=self.config, parameters=parameters
40        )
41
42    def get_request_property_chunks(
43        self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
44    ) -> Iterable[List[str]]:
45        if not self.property_limit:
46            single_property_chunk = list(property_fields)
47            if always_include_properties:
48                single_property_chunk.extend(always_include_properties)
49            yield single_property_chunk
50            return
51        current_chunk = list(always_include_properties) if always_include_properties else []
52        chunk_size = 0
53        for property_field in property_fields:
54            # If property_limit_type is not defined, we default to property_count which is just an incrementing count
55            # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
56            property_field_size = (
57                len(property_field)
58                + 3  # The +3 represents the extra characters for encoding the delimiter in between properties
59                if self.property_limit_type == PropertyLimitType.characters
60                else 1
61            )
62            if chunk_size + property_field_size > self.property_limit:
63                yield current_chunk
64                current_chunk = list(always_include_properties) if always_include_properties else []
65                chunk_size = 0
66            current_chunk.append(property_field)
67            chunk_size += property_field_size
68        yield current_chunk
69
70    def get_merge_key(self, record: Record) -> Optional[str]:
71        return self._record_merge_strategy.get_group_key(record=record)
class PropertyLimitType(enum.Enum):
15class PropertyLimitType(Enum):
16    """
17    The heuristic that determines when the maximum size of the current chunk of properties and when a new
18    one should be started.
19    """
20
21    characters = "characters"
22    property_count = "property_count"

The heuristic that determines when the maximum size of the current chunk of properties and when a new one should be started.

characters = <PropertyLimitType.characters: 'characters'>
property_count = <PropertyLimitType.property_count: 'property_count'>
@dataclass
class PropertyChunking:
25@dataclass
26class PropertyChunking:
27    """
28    Defines the behavior for how the complete list of properties to query for are broken down into smaller groups
29    that will be used for multiple requests to the target API.
30    """
31
32    property_limit_type: PropertyLimitType
33    property_limit: Optional[int]
34    record_merge_strategy: Optional[RecordMergeStrategy]
35    parameters: InitVar[Mapping[str, Any]]
36    config: Config
37
38    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
39        self._record_merge_strategy = self.record_merge_strategy or GroupByKey(
40            key="id", config=self.config, parameters=parameters
41        )
42
43    def get_request_property_chunks(
44        self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
45    ) -> Iterable[List[str]]:
46        if not self.property_limit:
47            single_property_chunk = list(property_fields)
48            if always_include_properties:
49                single_property_chunk.extend(always_include_properties)
50            yield single_property_chunk
51            return
52        current_chunk = list(always_include_properties) if always_include_properties else []
53        chunk_size = 0
54        for property_field in property_fields:
55            # If property_limit_type is not defined, we default to property_count which is just an incrementing count
56            # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
57            property_field_size = (
58                len(property_field)
59                + 3  # The +3 represents the extra characters for encoding the delimiter in between properties
60                if self.property_limit_type == PropertyLimitType.characters
61                else 1
62            )
63            if chunk_size + property_field_size > self.property_limit:
64                yield current_chunk
65                current_chunk = list(always_include_properties) if always_include_properties else []
66                chunk_size = 0
67            current_chunk.append(property_field)
68            chunk_size += property_field_size
69        yield current_chunk
70
71    def get_merge_key(self, record: Record) -> Optional[str]:
72        return self._record_merge_strategy.get_group_key(record=record)

Defines the behavior for how the complete list of properties to query for are broken down into smaller groups that will be used for multiple requests to the target API.

PropertyChunking( property_limit_type: PropertyLimitType, property_limit: Optional[int], record_merge_strategy: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.strategies.RecordMergeStrategy], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any])
property_limit_type: PropertyLimitType
property_limit: Optional[int]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
def get_request_property_chunks( self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]) -> Iterable[List[str]]:
43    def get_request_property_chunks(
44        self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
45    ) -> Iterable[List[str]]:
46        if not self.property_limit:
47            single_property_chunk = list(property_fields)
48            if always_include_properties:
49                single_property_chunk.extend(always_include_properties)
50            yield single_property_chunk
51            return
52        current_chunk = list(always_include_properties) if always_include_properties else []
53        chunk_size = 0
54        for property_field in property_fields:
55            # If property_limit_type is not defined, we default to property_count which is just an incrementing count
56            # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
57            property_field_size = (
58                len(property_field)
59                + 3  # The +3 represents the extra characters for encoding the delimiter in between properties
60                if self.property_limit_type == PropertyLimitType.characters
61                else 1
62            )
63            if chunk_size + property_field_size > self.property_limit:
64                yield current_chunk
65                current_chunk = list(always_include_properties) if always_include_properties else []
66                chunk_size = 0
67            current_chunk.append(property_field)
68            chunk_size += property_field_size
69        yield current_chunk
def get_merge_key(self, record: airbyte_cdk.Record) -> Optional[str]:
71    def get_merge_key(self, record: Record) -> Optional[str]:
72        return self._record_merge_strategy.get_group_key(record=record)