airbyte_cdk.sources.declarative.requesters.query_properties
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3from airbyte_cdk.sources.declarative.requesters.query_properties.properties_from_endpoint import ( 4 PropertiesFromEndpoint, 5) 6from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( 7 PropertyChunking, 8) 9from airbyte_cdk.sources.declarative.requesters.query_properties.query_properties import ( 10 QueryProperties, 11) 12 13__all__ = ["PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"]
14@dataclass 15class PropertiesFromEndpoint: 16 """ 17 Component that defines the behavior around how to dynamically retrieve a set of request properties from an 18 API endpoint. The set retrieved can then be injected into the requests to extract records from an API source. 19 """ 20 21 property_field_path: List[str] 22 retriever: Retriever 23 config: Config 24 parameters: InitVar[Mapping[str, Any]] 25 26 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 27 self._property_field_path = [ 28 InterpolatedString(string=property_field, parameters=parameters) 29 for property_field in self.property_field_path 30 ] 31 32 def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]: 33 response_properties = self.retriever.read_records( 34 records_schema={}, stream_slice=stream_slice 35 ) 36 for property_obj in response_properties: 37 path = [ 38 node.eval(self.config) if not isinstance(node, str) else node 39 for node in self._property_field_path 40 ] 41 yield dpath.get(property_obj, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
Component that defines the behavior around how to dynamically retrieve a set of request properties from an API endpoint. The set retrieved can then be injected into the requests to extract records from an API source.
32 def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]: 33 response_properties = self.retriever.read_records( 34 records_schema={}, stream_slice=stream_slice 35 ) 36 for property_obj in response_properties: 37 path = [ 38 node.eval(self.config) if not isinstance(node, str) else node 39 for node in self._property_field_path 40 ] 41 yield dpath.get(property_obj, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure
25@dataclass 26class PropertyChunking: 27 """ 28 Defines the behavior for how the complete list of properties to query for are broken down into smaller groups 29 that will be used for multiple requests to the target API. 30 """ 31 32 property_limit_type: PropertyLimitType 33 property_limit: Optional[int] 34 record_merge_strategy: Optional[RecordMergeStrategy] 35 parameters: InitVar[Mapping[str, Any]] 36 config: Config 37 38 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 39 self._record_merge_strategy = self.record_merge_strategy or GroupByKey( 40 key="id", config=self.config, parameters=parameters 41 ) 42 43 def get_request_property_chunks( 44 self, property_fields: Iterable[str], always_include_properties: Optional[List[str]] 45 ) -> Iterable[List[str]]: 46 if not self.property_limit: 47 single_property_chunk = list(property_fields) 48 if always_include_properties: 49 single_property_chunk.extend(always_include_properties) 50 yield single_property_chunk 51 return 52 current_chunk = list(always_include_properties) if always_include_properties else [] 53 chunk_size = 0 54 for property_field in property_fields: 55 # If property_limit_type is not defined, we default to property_count which is just an incrementing count 56 # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size 57 property_field_size = ( 58 len(property_field) 59 + 3 # The +3 represents the extra characters for encoding the delimiter in between properties 60 if self.property_limit_type == PropertyLimitType.characters 61 else 1 62 ) 63 if chunk_size + property_field_size > self.property_limit: 64 yield current_chunk 65 current_chunk = list(always_include_properties) if always_include_properties else [] 66 chunk_size = 0 67 current_chunk.append(property_field) 68 chunk_size += property_field_size 69 yield current_chunk 70 71 def get_merge_key(self, record: Record) -> Optional[str]: 72 return self._record_merge_strategy.get_group_key(record=record)
Defines the behavior for how the complete list of properties to query for are broken down into smaller groups that will be used for multiple requests to the target API.
43 def get_request_property_chunks( 44 self, property_fields: Iterable[str], always_include_properties: Optional[List[str]] 45 ) -> Iterable[List[str]]: 46 if not self.property_limit: 47 single_property_chunk = list(property_fields) 48 if always_include_properties: 49 single_property_chunk.extend(always_include_properties) 50 yield single_property_chunk 51 return 52 current_chunk = list(always_include_properties) if always_include_properties else [] 53 chunk_size = 0 54 for property_field in property_fields: 55 # If property_limit_type is not defined, we default to property_count which is just an incrementing count 56 # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size 57 property_field_size = ( 58 len(property_field) 59 + 3 # The +3 represents the extra characters for encoding the delimiter in between properties 60 if self.property_limit_type == PropertyLimitType.characters 61 else 1 62 ) 63 if chunk_size + property_field_size > self.property_limit: 64 yield current_chunk 65 current_chunk = list(always_include_properties) if always_include_properties else [] 66 chunk_size = 0 67 current_chunk.append(property_field) 68 chunk_size += property_field_size 69 yield current_chunk
14@dataclass 15class QueryProperties: 16 """ 17 Low-code component that encompasses the behavior to inject additional property values into the outbound API 18 requests. Property values can be defined statically within the manifest or dynamically by making requests 19 to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of 20 properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved 21 """ 22 23 property_list: Optional[Union[List[str], PropertiesFromEndpoint]] 24 always_include_properties: Optional[List[str]] 25 property_chunking: Optional[PropertyChunking] 26 config: Config 27 parameters: InitVar[Mapping[str, Any]] 28 29 def get_request_property_chunks( 30 self, stream_slice: Optional[StreamSlice] = None 31 ) -> Iterable[List[str]]: 32 """ 33 Uses the defined property_list to fetch the total set of properties dynamically or from a static list 34 and based on the resulting properties, performs property chunking if applicable. 35 :param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included 36 because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object 37 """ 38 fields: Union[Iterable[str], List[str]] 39 if isinstance(self.property_list, PropertiesFromEndpoint): 40 fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice) 41 else: 42 fields = self.property_list if self.property_list else [] 43 44 if self.property_chunking: 45 yield from self.property_chunking.get_request_property_chunks( 46 property_fields=fields, always_include_properties=self.always_include_properties 47 ) 48 else: 49 yield list(fields) 50 51 # delete later, but leaving this to keep the discussion thread on the PR from getting hidden 52 def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool: 53 property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice)) 54 try: 55 next(property_chunks) 56 next(property_chunks) 57 return True 58 except StopIteration: 59 return False
Low-code component that encompasses the behavior to inject additional property values into the outbound API requests. Property values can be defined statically within the manifest or dynamically by making requests to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved
29 def get_request_property_chunks( 30 self, stream_slice: Optional[StreamSlice] = None 31 ) -> Iterable[List[str]]: 32 """ 33 Uses the defined property_list to fetch the total set of properties dynamically or from a static list 34 and based on the resulting properties, performs property chunking if applicable. 35 :param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included 36 because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object 37 """ 38 fields: Union[Iterable[str], List[str]] 39 if isinstance(self.property_list, PropertiesFromEndpoint): 40 fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice) 41 else: 42 fields = self.property_list if self.property_list else [] 43 44 if self.property_chunking: 45 yield from self.property_chunking.get_request_property_chunks( 46 property_fields=fields, always_include_properties=self.always_include_properties 47 ) 48 else: 49 yield list(fields)
Uses the defined property_list to fetch the total set of properties dynamically or from a static list and based on the resulting properties, performs property chunking if applicable.
Parameters
- stream_slice: The StreamSlice of the current partition being processed during the sync. This is included because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object