airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3from dataclasses import InitVar, dataclass 4from enum import Enum 5from typing import Any, Iterable, List, Mapping, Optional 6 7from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey 8from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import ( 9 RecordMergeStrategy, 10) 11from airbyte_cdk.sources.types import Config, Record 12 13 14class PropertyLimitType(Enum): 15 """ 16 The heuristic that determines when the maximum size of the current chunk of properties and when a new 17 one should be started. 18 """ 19 20 characters = "characters" 21 property_count = "property_count" 22 23 24@dataclass 25class PropertyChunking: 26 """ 27 Defines the behavior for how the complete list of properties to query for are broken down into smaller groups 28 that will be used for multiple requests to the target API. 29 """ 30 31 property_limit_type: PropertyLimitType 32 property_limit: Optional[int] 33 record_merge_strategy: Optional[RecordMergeStrategy] 34 parameters: InitVar[Mapping[str, Any]] 35 config: Config 36 37 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 38 self._record_merge_strategy = self.record_merge_strategy or GroupByKey( 39 key="id", config=self.config, parameters=parameters 40 ) 41 42 def get_request_property_chunks( 43 self, property_fields: Iterable[str], always_include_properties: Optional[List[str]] 44 ) -> Iterable[List[str]]: 45 if not self.property_limit: 46 single_property_chunk = list(property_fields) 47 if always_include_properties: 48 single_property_chunk.extend(always_include_properties) 49 yield single_property_chunk 50 return 51 current_chunk = list(always_include_properties) if always_include_properties else [] 52 chunk_size = 0 53 for property_field in property_fields: 54 # If property_limit_type is not defined, we default to property_count which is just an incrementing count 55 # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size 56 property_field_size = ( 57 len(property_field) 58 + 3 # The +3 represents the extra characters for encoding the delimiter in between properties 59 if self.property_limit_type == PropertyLimitType.characters 60 else 1 61 ) 62 if chunk_size + property_field_size > self.property_limit: 63 yield current_chunk 64 current_chunk = list(always_include_properties) if always_include_properties else [] 65 chunk_size = 0 66 current_chunk.append(property_field) 67 chunk_size += property_field_size 68 yield current_chunk 69 70 def get_merge_key(self, record: Record) -> Optional[str]: 71 return self._record_merge_strategy.get_group_key(record=record)
class
PropertyLimitType(enum.Enum):
15class PropertyLimitType(Enum): 16 """ 17 The heuristic that determines when the maximum size of the current chunk of properties and when a new 18 one should be started. 19 """ 20 21 characters = "characters" 22 property_count = "property_count"
The heuristic that determines when the maximum size of the current chunk of properties and when a new one should be started.
characters =
<PropertyLimitType.characters: 'characters'>
property_count =
<PropertyLimitType.property_count: 'property_count'>
@dataclass
class
PropertyChunking:
25@dataclass 26class PropertyChunking: 27 """ 28 Defines the behavior for how the complete list of properties to query for are broken down into smaller groups 29 that will be used for multiple requests to the target API. 30 """ 31 32 property_limit_type: PropertyLimitType 33 property_limit: Optional[int] 34 record_merge_strategy: Optional[RecordMergeStrategy] 35 parameters: InitVar[Mapping[str, Any]] 36 config: Config 37 38 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 39 self._record_merge_strategy = self.record_merge_strategy or GroupByKey( 40 key="id", config=self.config, parameters=parameters 41 ) 42 43 def get_request_property_chunks( 44 self, property_fields: Iterable[str], always_include_properties: Optional[List[str]] 45 ) -> Iterable[List[str]]: 46 if not self.property_limit: 47 single_property_chunk = list(property_fields) 48 if always_include_properties: 49 single_property_chunk.extend(always_include_properties) 50 yield single_property_chunk 51 return 52 current_chunk = list(always_include_properties) if always_include_properties else [] 53 chunk_size = 0 54 for property_field in property_fields: 55 # If property_limit_type is not defined, we default to property_count which is just an incrementing count 56 # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size 57 property_field_size = ( 58 len(property_field) 59 + 3 # The +3 represents the extra characters for encoding the delimiter in between properties 60 if self.property_limit_type == PropertyLimitType.characters 61 else 1 62 ) 63 if chunk_size + property_field_size > self.property_limit: 64 yield current_chunk 65 current_chunk = list(always_include_properties) if always_include_properties else [] 66 chunk_size = 0 67 current_chunk.append(property_field) 68 chunk_size += property_field_size 69 yield current_chunk 70 71 def get_merge_key(self, record: Record) -> Optional[str]: 72 return self._record_merge_strategy.get_group_key(record=record)
Defines the behavior for how the complete list of properties to query for are broken down into smaller groups that will be used for multiple requests to the target API.
PropertyChunking( property_limit_type: PropertyLimitType, property_limit: Optional[int], record_merge_strategy: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.strategies.RecordMergeStrategy], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any])
property_limit_type: PropertyLimitType
record_merge_strategy: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.strategies.RecordMergeStrategy]
def
get_request_property_chunks( self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]) -> Iterable[List[str]]:
43 def get_request_property_chunks( 44 self, property_fields: Iterable[str], always_include_properties: Optional[List[str]] 45 ) -> Iterable[List[str]]: 46 if not self.property_limit: 47 single_property_chunk = list(property_fields) 48 if always_include_properties: 49 single_property_chunk.extend(always_include_properties) 50 yield single_property_chunk 51 return 52 current_chunk = list(always_include_properties) if always_include_properties else [] 53 chunk_size = 0 54 for property_field in property_fields: 55 # If property_limit_type is not defined, we default to property_count which is just an incrementing count 56 # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size 57 property_field_size = ( 58 len(property_field) 59 + 3 # The +3 represents the extra characters for encoding the delimiter in between properties 60 if self.property_limit_type == PropertyLimitType.characters 61 else 1 62 ) 63 if chunk_size + property_field_size > self.property_limit: 64 yield current_chunk 65 current_chunk = list(always_include_properties) if always_include_properties else [] 66 chunk_size = 0 67 current_chunk.append(property_field) 68 chunk_size += property_field_size 69 yield current_chunk