airbyte_cdk.sources.declarative.requesters.query_properties.property_selector

 1# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
 2
 3from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.json_schema_property_selector import (
 4    JsonSchemaPropertySelector,
 5)
 6from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import (
 7    PropertySelector,
 8)
 9
10__all__ = ["JsonSchemaPropertySelector", "PropertySelector"]
16@dataclass
17class JsonSchemaPropertySelector(PropertySelector):
18    """
19    A class that contains a list of transformations to apply to properties.
20    """
21
22    config: Config
23    parameters: InitVar[Mapping[str, Any]]
24    # For other non-read operations, there is no configured catalog and therefore no schema selection
25    configured_stream: Optional[ConfiguredAirbyteStream] = None
26    properties_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
27
28    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
29        self._parameters = parameters
30
31    def select(self) -> Optional[Set[str]]:
32        """
33        Returns the set of properties that have been selected for the configured stream. The intent being that
34        we should only query for selected properties not all since disabled properties are discarded.
35
36        When configured_stream is None, then there was no incoming catalog and all fields should be retrieved.
37        This is different from the empty set where the json_schema was empty and no schema fields were selected.
38        """
39
40        # For CHECK/DISCOVER operations, there is no catalog and therefore no configured stream or selected
41        # columns. In this case we return None which is interpreted by the QueryProperties component to not
42        # perform any filtering of schema properties and fetch all of them
43        if self.configured_stream is None:
44            return None
45
46        schema_properties = copy.deepcopy(
47            self.configured_stream.stream.json_schema.get("properties", {})
48        )
49        if self.properties_transformations:
50            for transformation in self.properties_transformations:
51                transformation.transform(
52                    record=schema_properties,
53                    config=self.config,
54                )
55        return set(schema_properties.keys())

A class that contains a list of transformations to apply to properties.

JsonSchemaPropertySelector( config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], configured_stream: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteStream] = None, properties_transformations: List[airbyte_cdk.RecordTransformation] = <factory>)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
configured_stream: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteStream] = None
properties_transformations: List[airbyte_cdk.RecordTransformation]
def select(self) -> Optional[Set[str]]:
31    def select(self) -> Optional[Set[str]]:
32        """
33        Returns the set of properties that have been selected for the configured stream. The intent being that
34        we should only query for selected properties not all since disabled properties are discarded.
35
36        When configured_stream is None, then there was no incoming catalog and all fields should be retrieved.
37        This is different from the empty set where the json_schema was empty and no schema fields were selected.
38        """
39
40        # For CHECK/DISCOVER operations, there is no catalog and therefore no configured stream or selected
41        # columns. In this case we return None which is interpreted by the QueryProperties component to not
42        # perform any filtering of schema properties and fetch all of them
43        if self.configured_stream is None:
44            return None
45
46        schema_properties = copy.deepcopy(
47            self.configured_stream.stream.json_schema.get("properties", {})
48        )
49        if self.properties_transformations:
50            for transformation in self.properties_transformations:
51                transformation.transform(
52                    record=schema_properties,
53                    config=self.config,
54                )
55        return set(schema_properties.keys())

Returns the set of properties that have been selected for the configured stream. The intent being that we should only query for selected properties not all since disabled properties are discarded.

When configured_stream is None, then there was no incoming catalog and all fields should be retrieved. This is different from the empty set where the json_schema was empty and no schema fields were selected.

@dataclass
class PropertySelector(abc.ABC):
 9@dataclass
10class PropertySelector(ABC):
11    """
12    Describes the interface for selecting and transforming properties from a configured stream's schema
13    to determine which properties should be queried from the API.
14    """
15
16    @abstractmethod
17    def select(self) -> Optional[Set[str]]:
18        """
19        Selects and returns the set of properties that should be queried from the API based on the
20        configured stream's schema and any applicable transformations.
21
22        Returns:
23            Set[str]: The set of property names to query
24        """
25        pass

Describes the interface for selecting and transforming properties from a configured stream's schema to determine which properties should be queried from the API.

@abstractmethod
def select(self) -> Optional[Set[str]]:
16    @abstractmethod
17    def select(self) -> Optional[Set[str]]:
18        """
19        Selects and returns the set of properties that should be queried from the API based on the
20        configured stream's schema and any applicable transformations.
21
22        Returns:
23            Set[str]: The set of property names to query
24        """
25        pass

Selects and returns the set of properties that should be queried from the API based on the configured stream's schema and any applicable transformations.

Returns:

Set[str]: The set of property names to query