airbyte_cdk.sources.declarative.requesters.query_properties

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2
 3from airbyte_cdk.sources.declarative.requesters.query_properties.properties_from_endpoint import (
 4    PropertiesFromEndpoint,
 5)
 6from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import (
 7    PropertyChunking,
 8)
 9from airbyte_cdk.sources.declarative.requesters.query_properties.query_properties import (
10    QueryProperties,
11)
12
13__all__ = ["PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"]
@dataclass
class PropertiesFromEndpoint:
14@dataclass
15class PropertiesFromEndpoint:
16    """
17    Component that defines the behavior around how to dynamically retrieve a set of request properties from an
18    API endpoint. The set retrieved can then be injected into the requests to extract records from an API source.
19    """
20
21    property_field_path: List[str]
22    retriever: Retriever
23    config: Config
24    parameters: InitVar[Mapping[str, Any]]
25
26    _cached_properties: Optional[List[str]] = None
27
28    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
29        self._property_field_path = [
30            InterpolatedString(string=property_field, parameters=parameters)
31            for property_field in self.property_field_path
32        ]
33
34    def get_properties_from_endpoint(self) -> List[str]:
35        if self._cached_properties is None:
36            self._cached_properties = list(
37                map(
38                    self._get_property,  # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records?
39                    self.retriever.read_records(records_schema={}, stream_slice=None),
40                )
41            )
42        return self._cached_properties
43
44    def _get_property(self, property_obj: Mapping[str, Any]) -> str:
45        path = [
46            node.eval(self.config) if not isinstance(node, str) else node
47            for node in self._property_field_path
48        ]
49        return str(dpath.get(property_obj, path, default=[]))  # type: ignore # extracted will be a MutableMapping, given input data structure

Component that defines the behavior around how to dynamically retrieve a set of request properties from an API endpoint. The set retrieved can then be injected into the requests to extract records from an API source.

PropertiesFromEndpoint( property_field_path: List[str], retriever: airbyte_cdk.sources.declarative.retrievers.Retriever, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], _cached_properties: Optional[List[str]] = None)
property_field_path: List[str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_properties_from_endpoint(self) -> List[str]:
34    def get_properties_from_endpoint(self) -> List[str]:
35        if self._cached_properties is None:
36            self._cached_properties = list(
37                map(
38                    self._get_property,  # type: ignore # SimpleRetriever and AsyncRetriever only returns Record. Should we change the return type of Retriever.read_records?
39                    self.retriever.read_records(records_schema={}, stream_slice=None),
40                )
41            )
42        return self._cached_properties
@dataclass
class PropertyChunking:
25@dataclass
26class PropertyChunking:
27    """
28    Defines the behavior for how the complete list of properties to query for are broken down into smaller groups
29    that will be used for multiple requests to the target API.
30    """
31
32    property_limit_type: PropertyLimitType
33    property_limit: Optional[int]
34    record_merge_strategy: Optional[RecordMergeStrategy]
35    parameters: InitVar[Mapping[str, Any]]
36    config: Config
37
38    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
39        self._record_merge_strategy = self.record_merge_strategy or GroupByKey(
40            key="id", config=self.config, parameters=parameters
41        )
42
43    def get_request_property_chunks(
44        self,
45        property_fields: List[str],
46        always_include_properties: Optional[List[str]],
47        configured_properties: Optional[Set[str]],
48    ) -> Iterable[List[str]]:
49        if not self.property_limit:
50            single_property_chunk = list(property_fields)
51            if always_include_properties:
52                single_property_chunk.extend(always_include_properties)
53            yield single_property_chunk
54            return
55        current_chunk = list(always_include_properties) if always_include_properties else []
56        chunk_size = 0
57        for property_field in property_fields:
58            # If property_limit_type is not defined, we default to property_count which is just an incrementing count
59            # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
60            if configured_properties is not None and property_field not in configured_properties:
61                continue
62            property_field_size = (
63                len(property_field)
64                + 3  # The +3 represents the extra characters for encoding the delimiter in between properties
65                if self.property_limit_type == PropertyLimitType.characters
66                else 1
67            )
68            if chunk_size + property_field_size > self.property_limit:
69                yield current_chunk
70                current_chunk = list(always_include_properties) if always_include_properties else []
71                chunk_size = 0
72            current_chunk.append(property_field)
73            chunk_size += property_field_size
74        yield current_chunk
75
76    def get_merge_key(self, record: Record) -> Optional[str]:
77        return self._record_merge_strategy.get_group_key(record=record)

Defines the behavior for how the complete list of properties to query for are broken down into smaller groups that will be used for multiple requests to the target API.

PropertyChunking( property_limit_type: airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking.PropertyLimitType, property_limit: Optional[int], record_merge_strategy: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.strategies.RecordMergeStrategy], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any])
property_limit: Optional[int]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
def get_request_property_chunks( self, property_fields: List[str], always_include_properties: Optional[List[str]], configured_properties: Optional[Set[str]]) -> Iterable[List[str]]:
43    def get_request_property_chunks(
44        self,
45        property_fields: List[str],
46        always_include_properties: Optional[List[str]],
47        configured_properties: Optional[Set[str]],
48    ) -> Iterable[List[str]]:
49        if not self.property_limit:
50            single_property_chunk = list(property_fields)
51            if always_include_properties:
52                single_property_chunk.extend(always_include_properties)
53            yield single_property_chunk
54            return
55        current_chunk = list(always_include_properties) if always_include_properties else []
56        chunk_size = 0
57        for property_field in property_fields:
58            # If property_limit_type is not defined, we default to property_count which is just an incrementing count
59            # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
60            if configured_properties is not None and property_field not in configured_properties:
61                continue
62            property_field_size = (
63                len(property_field)
64                + 3  # The +3 represents the extra characters for encoding the delimiter in between properties
65                if self.property_limit_type == PropertyLimitType.characters
66                else 1
67            )
68            if chunk_size + property_field_size > self.property_limit:
69                yield current_chunk
70                current_chunk = list(always_include_properties) if always_include_properties else []
71                chunk_size = 0
72            current_chunk.append(property_field)
73            chunk_size += property_field_size
74        yield current_chunk
def get_merge_key(self, record: airbyte_cdk.Record) -> Optional[str]:
76    def get_merge_key(self, record: Record) -> Optional[str]:
77        return self._record_merge_strategy.get_group_key(record=record)
@dataclass
class QueryProperties:
18@dataclass
19class QueryProperties:
20    """
21    Low-code component that encompasses the behavior to inject additional property values into the outbound API
22    requests. Property values can be defined statically within the manifest or dynamically by making requests
23    to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of
24    properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved
25    """
26
27    property_list: Optional[Union[List[str], PropertiesFromEndpoint]]
28    always_include_properties: Optional[List[str]]
29    property_chunking: Optional[PropertyChunking]
30    property_selector: Optional[PropertySelector]
31    config: Config
32    parameters: InitVar[Mapping[str, Any]]
33
34    def get_request_property_chunks(self) -> Iterable[List[str]]:
35        """
36        Uses the defined property_list to fetch the total set of properties dynamically or from a static list
37        and based on the resulting properties, performs property chunking if applicable.
38        """
39        fields: List[str]
40        configured_properties = self.property_selector.select() if self.property_selector else None
41
42        if isinstance(self.property_list, PropertiesFromEndpoint):
43            fields = self.property_list.get_properties_from_endpoint()
44        else:
45            fields = self.property_list if self.property_list else []
46
47        if self.property_chunking:
48            yield from self.property_chunking.get_request_property_chunks(
49                property_fields=fields,
50                always_include_properties=self.always_include_properties,
51                configured_properties=configured_properties,
52            )
53        else:
54            if configured_properties is not None:
55                all_fields = (
56                    [field for field in fields if field in configured_properties]
57                    if configured_properties is not None
58                    else list(fields)
59                )
60            else:
61                all_fields = list(fields)
62
63            if self.always_include_properties:
64                all_fields = list(self.always_include_properties) + all_fields
65
66            yield all_fields

Low-code component that encompasses the behavior to inject additional property values into the outbound API requests. Property values can be defined statically within the manifest or dynamically by making requests to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved

QueryProperties( property_list: Union[List[str], PropertiesFromEndpoint, NoneType], always_include_properties: Optional[List[str]], property_chunking: Optional[PropertyChunking], property_selector: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.PropertySelector], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
property_list: Union[List[str], PropertiesFromEndpoint, NoneType]
always_include_properties: Optional[List[str]]
property_chunking: Optional[PropertyChunking]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_property_chunks(self) -> Iterable[List[str]]:
34    def get_request_property_chunks(self) -> Iterable[List[str]]:
35        """
36        Uses the defined property_list to fetch the total set of properties dynamically or from a static list
37        and based on the resulting properties, performs property chunking if applicable.
38        """
39        fields: List[str]
40        configured_properties = self.property_selector.select() if self.property_selector else None
41
42        if isinstance(self.property_list, PropertiesFromEndpoint):
43            fields = self.property_list.get_properties_from_endpoint()
44        else:
45            fields = self.property_list if self.property_list else []
46
47        if self.property_chunking:
48            yield from self.property_chunking.get_request_property_chunks(
49                property_fields=fields,
50                always_include_properties=self.always_include_properties,
51                configured_properties=configured_properties,
52            )
53        else:
54            if configured_properties is not None:
55                all_fields = (
56                    [field for field in fields if field in configured_properties]
57                    if configured_properties is not None
58                    else list(fields)
59                )
60            else:
61                all_fields = list(fields)
62
63            if self.always_include_properties:
64                all_fields = list(self.always_include_properties) + all_fields
65
66            yield all_fields

Uses the defined property_list to fetch the total set of properties dynamically or from a static list and based on the resulting properties, performs property chunking if applicable.