airbyte_cdk.sources.declarative.requesters.query_properties
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3from airbyte_cdk.sources.declarative.requesters.query_properties.properties_from_endpoint import ( 4 PropertiesFromEndpoint, 5) 6from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import ( 7 PropertyChunking, 8) 9from airbyte_cdk.sources.declarative.requesters.query_properties.query_properties import ( 10 QueryProperties, 11) 12 13__all__ = ["PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"]
14@dataclass 15class PropertiesFromEndpoint: 16 """ 17 Component that defines the behavior around how to dynamically retrieve a set of request properties from an 18 API endpoint. The set retrieved can then be injected into the requests to extract records from an API source. 19 """ 20 21 property_field_path: List[str] 22 retriever: Retriever 23 config: Config 24 parameters: InitVar[Mapping[str, Any]] 25 26 _cached_properties: Optional[List[str]] = None 27 28 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 29 self._property_field_path = [ 30 InterpolatedString(string=property_field, parameters=parameters) 31 for property_field in self.property_field_path 32 ] 33 34 def get_properties_from_endpoint(self) -> List[str]: 35 if self._cached_properties is None: 36 self._cached_properties = list( 37 map( 38 self._get_property, # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records? 39 self.retriever.read_records(records_schema={}, stream_slice=None), 40 ) 41 ) 42 return self._cached_properties 43 44 def _get_property(self, property_obj: Mapping[str, Any]) -> str: 45 path = [ 46 node.eval(self.config) if not isinstance(node, str) else node 47 for node in self._property_field_path 48 ] 49 return str(dpath.get(property_obj, path, default=[])) # type: ignore # extracted will be a MutableMapping, given input data structure
Component that defines the behavior around how to dynamically retrieve a set of request properties from an API endpoint. The set retrieved can then be injected into the requests to extract records from an API source.
34 def get_properties_from_endpoint(self) -> List[str]: 35 if self._cached_properties is None: 36 self._cached_properties = list( 37 map( 38 self._get_property, # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records? 39 self.retriever.read_records(records_schema={}, stream_slice=None), 40 ) 41 ) 42 return self._cached_properties
25@dataclass 26class PropertyChunking: 27 """ 28 Defines the behavior for how the complete list of properties to query for are broken down into smaller groups 29 that will be used for multiple requests to the target API. 30 """ 31 32 property_limit_type: PropertyLimitType 33 property_limit: Optional[int] 34 record_merge_strategy: Optional[RecordMergeStrategy] 35 parameters: InitVar[Mapping[str, Any]] 36 config: Config 37 38 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 39 self._record_merge_strategy = self.record_merge_strategy or GroupByKey( 40 key="id", config=self.config, parameters=parameters 41 ) 42 43 def get_request_property_chunks( 44 self, 45 property_fields: List[str], 46 always_include_properties: Optional[List[str]], 47 configured_properties: Optional[Set[str]], 48 ) -> Iterable[List[str]]: 49 if not self.property_limit: 50 single_property_chunk = list(property_fields) 51 if always_include_properties: 52 single_property_chunk.extend(always_include_properties) 53 yield single_property_chunk 54 return 55 current_chunk = list(always_include_properties) if always_include_properties else [] 56 chunk_size = 0 57 for property_field in property_fields: 58 # If property_limit_type is not defined, we default to property_count which is just an incrementing count 59 # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size 60 if configured_properties is not None and property_field not in configured_properties: 61 continue 62 property_field_size = ( 63 len(property_field) 64 + 3 # The +3 represents the extra characters for encoding the delimiter in between properties 65 if self.property_limit_type == PropertyLimitType.characters 66 else 1 67 ) 68 if chunk_size + property_field_size > self.property_limit: 69 yield current_chunk 70 current_chunk = list(always_include_properties) if always_include_properties else [] 71 chunk_size = 0 72 current_chunk.append(property_field) 73 chunk_size += property_field_size 74 yield current_chunk 75 76 def get_merge_key(self, record: Record) -> Optional[str]: 77 return self._record_merge_strategy.get_group_key(record=record)
Defines the behavior for how the complete list of properties to query for are broken down into smaller groups that will be used for multiple requests to the target API.
43 def get_request_property_chunks( 44 self, 45 property_fields: List[str], 46 always_include_properties: Optional[List[str]], 47 configured_properties: Optional[Set[str]], 48 ) -> Iterable[List[str]]: 49 if not self.property_limit: 50 single_property_chunk = list(property_fields) 51 if always_include_properties: 52 single_property_chunk.extend(always_include_properties) 53 yield single_property_chunk 54 return 55 current_chunk = list(always_include_properties) if always_include_properties else [] 56 chunk_size = 0 57 for property_field in property_fields: 58 # If property_limit_type is not defined, we default to property_count which is just an incrementing count 59 # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size 60 if configured_properties is not None and property_field not in configured_properties: 61 continue 62 property_field_size = ( 63 len(property_field) 64 + 3 # The +3 represents the extra characters for encoding the delimiter in between properties 65 if self.property_limit_type == PropertyLimitType.characters 66 else 1 67 ) 68 if chunk_size + property_field_size > self.property_limit: 69 yield current_chunk 70 current_chunk = list(always_include_properties) if always_include_properties else [] 71 chunk_size = 0 72 current_chunk.append(property_field) 73 chunk_size += property_field_size 74 yield current_chunk
18@dataclass 19class QueryProperties: 20 """ 21 Low-code component that encompasses the behavior to inject additional property values into the outbound API 22 requests. Property values can be defined statically within the manifest or dynamically by making requests 23 to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of 24 properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved 25 """ 26 27 property_list: Optional[Union[List[str], PropertiesFromEndpoint]] 28 always_include_properties: Optional[List[str]] 29 property_chunking: Optional[PropertyChunking] 30 property_selector: Optional[PropertySelector] 31 config: Config 32 parameters: InitVar[Mapping[str, Any]] 33 34 def get_request_property_chunks(self) -> Iterable[List[str]]: 35 """ 36 Uses the defined property_list to fetch the total set of properties dynamically or from a static list 37 and based on the resulting properties, performs property chunking if applicable. 38 """ 39 fields: List[str] 40 configured_properties = self.property_selector.select() if self.property_selector else None 41 42 if isinstance(self.property_list, PropertiesFromEndpoint): 43 fields = self.property_list.get_properties_from_endpoint() 44 else: 45 fields = self.property_list if self.property_list else [] 46 47 if self.property_chunking: 48 yield from self.property_chunking.get_request_property_chunks( 49 property_fields=fields, 50 always_include_properties=self.always_include_properties, 51 configured_properties=configured_properties, 52 ) 53 else: 54 if configured_properties is not None: 55 all_fields = ( 56 [field for field in fields if field in configured_properties] 57 if configured_properties is not None 58 else list(fields) 59 ) 60 else: 61 all_fields = list(fields) 62 63 if self.always_include_properties: 64 all_fields = list(self.always_include_properties) + all_fields 65 66 yield all_fields
Low-code component that encompasses the behavior to inject additional property values into the outbound API requests. Property values can be defined statically within the manifest or dynamically by making requests to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved
34 def get_request_property_chunks(self) -> Iterable[List[str]]: 35 """ 36 Uses the defined property_list to fetch the total set of properties dynamically or from a static list 37 and based on the resulting properties, performs property chunking if applicable. 38 """ 39 fields: List[str] 40 configured_properties = self.property_selector.select() if self.property_selector else None 41 42 if isinstance(self.property_list, PropertiesFromEndpoint): 43 fields = self.property_list.get_properties_from_endpoint() 44 else: 45 fields = self.property_list if self.property_list else [] 46 47 if self.property_chunking: 48 yield from self.property_chunking.get_request_property_chunks( 49 property_fields=fields, 50 always_include_properties=self.always_include_properties, 51 configured_properties=configured_properties, 52 ) 53 else: 54 if configured_properties is not None: 55 all_fields = ( 56 [field for field in fields if field in configured_properties] 57 if configured_properties is not None 58 else list(fields) 59 ) 60 else: 61 all_fields = list(fields) 62 63 if self.always_include_properties: 64 all_fields = list(self.always_include_properties) + all_fields 65 66 yield all_fields
Uses the defined property_list to fetch the total set of properties dynamically or from a static list and based on the resulting properties, performs property chunking if applicable.