airbyte_cdk.sources.declarative.requesters.query_properties

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2
 3from airbyte_cdk.sources.declarative.requesters.query_properties.properties_from_endpoint import (
 4    PropertiesFromEndpoint,
 5)
 6from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import (
 7    PropertyChunking,
 8)
 9from airbyte_cdk.sources.declarative.requesters.query_properties.query_properties import (
10    QueryProperties,
11)
12
13__all__ = ["PropertiesFromEndpoint", "PropertyChunking", "QueryProperties"]
@dataclass
class PropertiesFromEndpoint:
14@dataclass
15class PropertiesFromEndpoint:
16    """
17    Component that defines the behavior around how to dynamically retrieve a set of request properties from an
18    API endpoint. The set retrieved can then be injected into the requests to extract records from an API source.
19    """
20
21    property_field_path: List[str]
22    retriever: Retriever
23    config: Config
24    parameters: InitVar[Mapping[str, Any]]
25
26    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
27        self._property_field_path = [
28            InterpolatedString(string=property_field, parameters=parameters)
29            for property_field in self.property_field_path
30        ]
31
32    def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]:
33        response_properties = self.retriever.read_records(
34            records_schema={}, stream_slice=stream_slice
35        )
36        for property_obj in response_properties:
37            path = [
38                node.eval(self.config) if not isinstance(node, str) else node
39                for node in self._property_field_path
40            ]
41            yield dpath.get(property_obj, path, default=[])  # type: ignore # extracted will be a MutableMapping, given input data structure

Component that defines the behavior around how to dynamically retrieve a set of request properties from an API endpoint. The set retrieved can then be injected into the requests to extract records from an API source.

PropertiesFromEndpoint( property_field_path: List[str], retriever: airbyte_cdk.sources.declarative.retrievers.Retriever, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
property_field_path: List[str]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_properties_from_endpoint( self, stream_slice: Optional[airbyte_cdk.StreamSlice]) -> Iterable[str]:
32    def get_properties_from_endpoint(self, stream_slice: Optional[StreamSlice]) -> Iterable[str]:
33        response_properties = self.retriever.read_records(
34            records_schema={}, stream_slice=stream_slice
35        )
36        for property_obj in response_properties:
37            path = [
38                node.eval(self.config) if not isinstance(node, str) else node
39                for node in self._property_field_path
40            ]
41            yield dpath.get(property_obj, path, default=[])  # type: ignore # extracted will be a MutableMapping, given input data structure
@dataclass
class PropertyChunking:
25@dataclass
26class PropertyChunking:
27    """
28    Defines the behavior for how the complete list of properties to query for are broken down into smaller groups
29    that will be used for multiple requests to the target API.
30    """
31
32    property_limit_type: PropertyLimitType
33    property_limit: Optional[int]
34    record_merge_strategy: Optional[RecordMergeStrategy]
35    parameters: InitVar[Mapping[str, Any]]
36    config: Config
37
38    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
39        self._record_merge_strategy = self.record_merge_strategy or GroupByKey(
40            key="id", config=self.config, parameters=parameters
41        )
42
43    def get_request_property_chunks(
44        self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
45    ) -> Iterable[List[str]]:
46        if not self.property_limit:
47            single_property_chunk = list(property_fields)
48            if always_include_properties:
49                single_property_chunk.extend(always_include_properties)
50            yield single_property_chunk
51            return
52        current_chunk = list(always_include_properties) if always_include_properties else []
53        chunk_size = 0
54        for property_field in property_fields:
55            # If property_limit_type is not defined, we default to property_count which is just an incrementing count
56            # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
57            property_field_size = (
58                len(property_field)
59                + 3  # The +3 represents the extra characters for encoding the delimiter in between properties
60                if self.property_limit_type == PropertyLimitType.characters
61                else 1
62            )
63            if chunk_size + property_field_size > self.property_limit:
64                yield current_chunk
65                current_chunk = list(always_include_properties) if always_include_properties else []
66                chunk_size = 0
67            current_chunk.append(property_field)
68            chunk_size += property_field_size
69        yield current_chunk
70
71    def get_merge_key(self, record: Record) -> Optional[str]:
72        return self._record_merge_strategy.get_group_key(record=record)

Defines the behavior for how the complete list of properties to query for are broken down into smaller groups that will be used for multiple requests to the target API.

PropertyChunking( property_limit_type: airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking.PropertyLimitType, property_limit: Optional[int], record_merge_strategy: Optional[airbyte_cdk.sources.declarative.requesters.query_properties.strategies.RecordMergeStrategy], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any])
property_limit: Optional[int]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
def get_request_property_chunks( self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]) -> Iterable[List[str]]:
43    def get_request_property_chunks(
44        self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
45    ) -> Iterable[List[str]]:
46        if not self.property_limit:
47            single_property_chunk = list(property_fields)
48            if always_include_properties:
49                single_property_chunk.extend(always_include_properties)
50            yield single_property_chunk
51            return
52        current_chunk = list(always_include_properties) if always_include_properties else []
53        chunk_size = 0
54        for property_field in property_fields:
55            # If property_limit_type is not defined, we default to property_count which is just an incrementing count
56            # todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
57            property_field_size = (
58                len(property_field)
59                + 3  # The +3 represents the extra characters for encoding the delimiter in between properties
60                if self.property_limit_type == PropertyLimitType.characters
61                else 1
62            )
63            if chunk_size + property_field_size > self.property_limit:
64                yield current_chunk
65                current_chunk = list(always_include_properties) if always_include_properties else []
66                chunk_size = 0
67            current_chunk.append(property_field)
68            chunk_size += property_field_size
69        yield current_chunk
def get_merge_key(self, record: airbyte_cdk.Record) -> Optional[str]:
71    def get_merge_key(self, record: Record) -> Optional[str]:
72        return self._record_merge_strategy.get_group_key(record=record)
@dataclass
class QueryProperties:
14@dataclass
15class QueryProperties:
16    """
17    Low-code component that encompasses the behavior to inject additional property values into the outbound API
18    requests. Property values can be defined statically within the manifest or dynamically by making requests
19    to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of
20    properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved
21    """
22
23    property_list: Optional[Union[List[str], PropertiesFromEndpoint]]
24    always_include_properties: Optional[List[str]]
25    property_chunking: Optional[PropertyChunking]
26    config: Config
27    parameters: InitVar[Mapping[str, Any]]
28
29    def get_request_property_chunks(
30        self, stream_slice: Optional[StreamSlice] = None
31    ) -> Iterable[List[str]]:
32        """
33        Uses the defined property_list to fetch the total set of properties dynamically or from a static list
34        and based on the resulting properties, performs property chunking if applicable.
35        :param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included
36        because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object
37        """
38        fields: Union[Iterable[str], List[str]]
39        if isinstance(self.property_list, PropertiesFromEndpoint):
40            fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice)
41        else:
42            fields = self.property_list if self.property_list else []
43
44        if self.property_chunking:
45            yield from self.property_chunking.get_request_property_chunks(
46                property_fields=fields, always_include_properties=self.always_include_properties
47            )
48        else:
49            yield list(fields)
50
51    # delete later, but leaving this to keep the discussion thread on the PR from getting hidden
52    def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool:
53        property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice))
54        try:
55            next(property_chunks)
56            next(property_chunks)
57            return True
58        except StopIteration:
59            return False

Low-code component that encompasses the behavior to inject additional property values into the outbound API requests. Property values can be defined statically within the manifest or dynamically by making requests to a partner API to retrieve the properties. Query properties also allow for splitting of the total set of properties into smaller chunks to satisfy API restrictions around the total amount of data retrieved

QueryProperties( property_list: Union[List[str], PropertiesFromEndpoint, NoneType], always_include_properties: Optional[List[str]], property_chunking: Optional[PropertyChunking], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
property_list: Union[List[str], PropertiesFromEndpoint, NoneType]
always_include_properties: Optional[List[str]]
property_chunking: Optional[PropertyChunking]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def get_request_property_chunks( self, stream_slice: Optional[airbyte_cdk.StreamSlice] = None) -> Iterable[List[str]]:
29    def get_request_property_chunks(
30        self, stream_slice: Optional[StreamSlice] = None
31    ) -> Iterable[List[str]]:
32        """
33        Uses the defined property_list to fetch the total set of properties dynamically or from a static list
34        and based on the resulting properties, performs property chunking if applicable.
35        :param stream_slice: The StreamSlice of the current partition being processed during the sync. This is included
36        because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object
37        """
38        fields: Union[Iterable[str], List[str]]
39        if isinstance(self.property_list, PropertiesFromEndpoint):
40            fields = self.property_list.get_properties_from_endpoint(stream_slice=stream_slice)
41        else:
42            fields = self.property_list if self.property_list else []
43
44        if self.property_chunking:
45            yield from self.property_chunking.get_request_property_chunks(
46                property_fields=fields, always_include_properties=self.always_include_properties
47            )
48        else:
49            yield list(fields)

Uses the defined property_list to fetch the total set of properties dynamically or from a static list and based on the resulting properties, performs property chunking if applicable.

Parameters
  • stream_slice: The StreamSlice of the current partition being processed during the sync. This is included because subcomponents of QueryProperties can make use of interpolation of the top-level StreamSlice object
def has_multiple_chunks( self, stream_slice: Optional[airbyte_cdk.StreamSlice]) -> bool:
52    def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool:
53        property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice))
54        try:
55            next(property_chunks)
56            next(property_chunks)
57            return True
58        except StopIteration:
59            return False