airbyte_cdk.sources.declarative.requesters.query_properties.property_selector
1# Copyright (c) 2025 Airbyte, Inc., all rights reserved. 2 3from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.json_schema_property_selector import ( 4 JsonSchemaPropertySelector, 5) 6from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import ( 7 PropertySelector, 8) 9 10__all__ = ["JsonSchemaPropertySelector", "PropertySelector"]
16@dataclass 17class JsonSchemaPropertySelector(PropertySelector): 18 """ 19 A class that contains a list of transformations to apply to properties. 20 """ 21 22 config: Config 23 parameters: InitVar[Mapping[str, Any]] 24 # For other non-read operations, there is no configured catalog and therefore no schema selection 25 configured_stream: Optional[ConfiguredAirbyteStream] = None 26 properties_transformations: List[RecordTransformation] = field(default_factory=lambda: []) 27 28 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 29 self._parameters = parameters 30 31 def select(self) -> Optional[Set[str]]: 32 """ 33 Returns the set of properties that have been selected for the configured stream. The intent being that 34 we should only query for selected properties not all since disabled properties are discarded. 35 36 When configured_stream is None, then there was no incoming catalog and all fields should be retrieved. 37 This is different from the empty set where the json_schema was empty and no schema fields were selected. 38 """ 39 40 # For CHECK/DISCOVER operations, there is no catalog and therefore no configured stream or selected 41 # columns. In this case we return None which is interpreted by the QueryProperties component to not 42 # perform any filtering of schema properties and fetch all of them 43 if self.configured_stream is None: 44 return None 45 46 schema_properties = copy.deepcopy( 47 self.configured_stream.stream.json_schema.get("properties", {}) 48 ) 49 if self.properties_transformations: 50 for transformation in self.properties_transformations: 51 transformation.transform( 52 record=schema_properties, 53 config=self.config, 54 ) 55 return set(schema_properties.keys())
A class that contains a list of transformations to apply to properties.
31 def select(self) -> Optional[Set[str]]: 32 """ 33 Returns the set of properties that have been selected for the configured stream. The intent being that 34 we should only query for selected properties not all since disabled properties are discarded. 35 36 When configured_stream is None, then there was no incoming catalog and all fields should be retrieved. 37 This is different from the empty set where the json_schema was empty and no schema fields were selected. 38 """ 39 40 # For CHECK/DISCOVER operations, there is no catalog and therefore no configured stream or selected 41 # columns. In this case we return None which is interpreted by the QueryProperties component to not 42 # perform any filtering of schema properties and fetch all of them 43 if self.configured_stream is None: 44 return None 45 46 schema_properties = copy.deepcopy( 47 self.configured_stream.stream.json_schema.get("properties", {}) 48 ) 49 if self.properties_transformations: 50 for transformation in self.properties_transformations: 51 transformation.transform( 52 record=schema_properties, 53 config=self.config, 54 ) 55 return set(schema_properties.keys())
Returns the set of properties that have been selected for the configured stream. The intent being that we should only query for selected properties not all since disabled properties are discarded.
When configured_stream is None, then there was no incoming catalog and all fields should be retrieved. This is different from the empty set where the json_schema was empty and no schema fields were selected.
9@dataclass 10class PropertySelector(ABC): 11 """ 12 Describes the interface for selecting and transforming properties from a configured stream's schema 13 to determine which properties should be queried from the API. 14 """ 15 16 @abstractmethod 17 def select(self) -> Optional[Set[str]]: 18 """ 19 Selects and returns the set of properties that should be queried from the API based on the 20 configured stream's schema and any applicable transformations. 21 22 Returns: 23 Set[str]: The set of property names to query 24 """ 25 pass
Describes the interface for selecting and transforming properties from a configured stream's schema to determine which properties should be queried from the API.
16 @abstractmethod 17 def select(self) -> Optional[Set[str]]: 18 """ 19 Selects and returns the set of properties that should be queried from the API based on the 20 configured stream's schema and any applicable transformations. 21 22 Returns: 23 Set[str]: The set of property names to query 24 """ 25 pass
Selects and returns the set of properties that should be queried from the API based on the configured stream's schema and any applicable transformations.
Returns:
Set[str]: The set of property names to query