airbyte_cdk.sources.declarative.resolvers
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5from typing import Mapping 6 7from pydantic.v1 import BaseModel 8 9from airbyte_cdk.sources.declarative.models import ( 10 ConfigComponentsResolver as ConfigComponentsResolverModel, 11) 12from airbyte_cdk.sources.declarative.models import ( 13 HttpComponentsResolver as HttpComponentsResolverModel, 14) 15from airbyte_cdk.sources.declarative.resolvers.components_resolver import ( 16 ComponentMappingDefinition, 17 ComponentsResolver, 18 ResolvedComponentMappingDefinition, 19) 20from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import ( 21 ConfigComponentsResolver, 22 StreamConfig, 23) 24from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import ( 25 HttpComponentsResolver, 26) 27 28COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = { 29 "HttpComponentsResolver": HttpComponentsResolverModel, 30 "ConfigComponentsResolver": ConfigComponentsResolverModel, 31} 32 33__all__ = [ 34 "ComponentsResolver", 35 "HttpComponentsResolver", 36 "ComponentMappingDefinition", 37 "ResolvedComponentMappingDefinition", 38 "StreamConfig", 39 "ConfigComponentsResolver", 40 "COMPONENTS_RESOLVER_TYPE_MAPPING", 41]
42@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 43@dataclass 44class ComponentsResolver(ABC): 45 """ 46 Abstract base class for resolving components in a stream template. 47 """ 48 49 @abstractmethod 50 def resolve_components( 51 self, stream_template_config: Dict[str, Any] 52 ) -> Iterable[Dict[str, Any]]: 53 """ 54 Maps and populates values into a stream template configuration. 55 :param stream_template_config: The stream template with placeholders for components. 56 :yields: The resolved stream config with populated values. 57 """ 58 pass
Abstract base class for resolving components in a stream template.
49 @abstractmethod 50 def resolve_components( 51 self, stream_template_config: Dict[str, Any] 52 ) -> Iterable[Dict[str, Any]]: 53 """ 54 Maps and populates values into a stream template configuration. 55 :param stream_template_config: The stream template with placeholders for components. 56 :yields: The resolved stream config with populated values. 57 """ 58 pass
Maps and populates values into a stream template configuration.
Parameters
- stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
24@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 25@dataclass 26class HttpComponentsResolver(ComponentsResolver): 27 """ 28 Resolves and populates stream templates with components fetched via an HTTP retriever. 29 30 Attributes: 31 retriever (Retriever): The retriever used to fetch data from an API. 32 config (Config): Configuration object for the resolver. 33 components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. 34 parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. 35 """ 36 37 retriever: Retriever 38 config: Config 39 components_mapping: List[ComponentMappingDefinition] 40 parameters: InitVar[Mapping[str, Any]] 41 _resolved_components: List[ResolvedComponentMappingDefinition] = field( 42 init=False, repr=False, default_factory=list 43 ) 44 45 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 46 """ 47 Initializes and parses component mappings, converting them to resolved definitions. 48 49 Args: 50 parameters (Mapping[str, Any]): Parameters for interpolation. 51 """ 52 for component_mapping in self.components_mapping: 53 if isinstance(component_mapping.value, (str, InterpolatedString)): 54 interpolated_value = ( 55 InterpolatedString.create(component_mapping.value, parameters=parameters) 56 if isinstance(component_mapping.value, str) 57 else component_mapping.value 58 ) 59 60 field_path = [ 61 InterpolatedString.create(path, parameters=parameters) 62 for path in component_mapping.field_path 63 ] 64 65 self._resolved_components.append( 66 ResolvedComponentMappingDefinition( 67 field_path=field_path, 68 value=interpolated_value, 69 value_type=component_mapping.value_type, 70 parameters=parameters, 71 ) 72 ) 73 else: 74 raise ValueError( 75 f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" 76 ) 77 78 def resolve_components( 79 self, stream_template_config: Dict[str, Any] 80 ) -> Iterable[Dict[str, Any]]: 81 """ 82 Resolves components in the stream template configuration by populating values. 83 84 Args: 85 stream_template_config (Dict[str, Any]): Stream template to populate. 86 87 Yields: 88 Dict[str, Any]: Updated configurations with resolved components. 89 """ 90 kwargs = {"stream_template_config": stream_template_config} 91 92 for stream_slice in self.retriever.stream_slices(): 93 for components_values in self.retriever.read_records( 94 records_schema={}, stream_slice=stream_slice 95 ): 96 updated_config = deepcopy(stream_template_config) 97 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 98 kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] 99 100 for resolved_component in self._resolved_components: 101 valid_types = ( 102 (resolved_component.value_type,) if resolved_component.value_type else None 103 ) 104 value = resolved_component.value.eval( 105 self.config, valid_types=valid_types, **kwargs 106 ) 107 108 path = [ 109 path.eval(self.config, **kwargs) for path in resolved_component.field_path 110 ] 111 dpath.set(updated_config, path, value) 112 113 yield updated_config
Resolves and populates stream templates with components fetched via an HTTP retriever.
Attributes:
- retriever (Retriever): The retriever used to fetch data from an API.
- config (Config): Configuration object for the resolver.
- components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
- parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
78 def resolve_components( 79 self, stream_template_config: Dict[str, Any] 80 ) -> Iterable[Dict[str, Any]]: 81 """ 82 Resolves components in the stream template configuration by populating values. 83 84 Args: 85 stream_template_config (Dict[str, Any]): Stream template to populate. 86 87 Yields: 88 Dict[str, Any]: Updated configurations with resolved components. 89 """ 90 kwargs = {"stream_template_config": stream_template_config} 91 92 for stream_slice in self.retriever.stream_slices(): 93 for components_values in self.retriever.read_records( 94 records_schema={}, stream_slice=stream_slice 95 ): 96 updated_config = deepcopy(stream_template_config) 97 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 98 kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] 99 100 for resolved_component in self._resolved_components: 101 valid_types = ( 102 (resolved_component.value_type,) if resolved_component.value_type else None 103 ) 104 value = resolved_component.value.eval( 105 self.config, valid_types=valid_types, **kwargs 106 ) 107 108 path = [ 109 path.eval(self.config, **kwargs) for path in resolved_component.field_path 110 ] 111 dpath.set(updated_config, path, value) 112 113 yield updated_config
Resolves components in the stream template configuration by populating values.
Arguments:
- stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:
Dict[str, Any]: Updated configurations with resolved components.
16@dataclass(frozen=True) 17class ComponentMappingDefinition: 18 """Defines the configuration for mapping a component in a stream. This class specifies 19 what field in the stream template should be updated with value, supporting dynamic interpolation 20 and type enforcement.""" 21 22 field_path: List["InterpolatedString"] 23 value: Union["InterpolatedString", str] 24 value_type: Optional[Type[Any]] 25 parameters: InitVar[Mapping[str, Any]] 26 create_or_update: Optional[bool] = False
Defines the configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.
29@dataclass(frozen=True) 30class ResolvedComponentMappingDefinition: 31 """Defines resolved configuration for mapping a component in a stream. This class specifies 32 what field in the stream template should be updated with value, supporting dynamic interpolation 33 and type enforcement.""" 34 35 field_path: List["InterpolatedString"] 36 value: "InterpolatedString" 37 value_type: Optional[Type[Any]] 38 parameters: InitVar[Mapping[str, Any]] 39 create_or_update: Optional[bool] = False
Defines resolved configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.
26@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 27@dataclass 28class StreamConfig: 29 """ 30 Identifies stream config details for dynamic schema extraction and processing. 31 """ 32 33 configs_pointer: List[Union[InterpolatedString, str]] 34 parameters: InitVar[Mapping[str, Any]] 35 default_values: Optional[List[Any]] = None 36 37 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 38 self.configs_pointer = [ 39 InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer 40 ]
Identifies stream config details for dynamic schema extraction and processing.
43@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 44@dataclass 45class ConfigComponentsResolver(ComponentsResolver): 46 """ 47 Resolves and populates stream templates with components fetched via source config. 48 49 Attributes: 50 stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config. 51 config (Config): Configuration object for the resolver. 52 components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. 53 parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. 54 """ 55 56 stream_configs: List[StreamConfig] 57 config: Config 58 components_mapping: List[ComponentMappingDefinition] 59 parameters: InitVar[Mapping[str, Any]] 60 _resolved_components: List[ResolvedComponentMappingDefinition] = field( 61 init=False, repr=False, default_factory=list 62 ) 63 64 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 65 """ 66 Initializes and parses component mappings, converting them to resolved definitions. 67 68 Args: 69 parameters (Mapping[str, Any]): Parameters for interpolation. 70 """ 71 72 for component_mapping in self.components_mapping: 73 if isinstance(component_mapping.value, (str, InterpolatedString)): 74 interpolated_value = ( 75 InterpolatedString.create(component_mapping.value, parameters=parameters) 76 if isinstance(component_mapping.value, str) 77 else component_mapping.value 78 ) 79 80 field_path = [ 81 InterpolatedString.create(path, parameters=parameters) 82 for path in component_mapping.field_path 83 ] 84 85 self._resolved_components.append( 86 ResolvedComponentMappingDefinition( 87 field_path=field_path, 88 value=interpolated_value, 89 value_type=component_mapping.value_type, 90 create_or_update=component_mapping.create_or_update, 91 parameters=parameters, 92 ) 93 ) 94 else: 95 raise ValueError( 96 f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" 97 ) 98 99 @staticmethod 100 def _merge_combination(combo: Iterable[Tuple[int, Any]]) -> Dict[str, Any]: 101 """Collapse a combination of ``(idx, elem)`` into one config dict.""" 102 result: Dict[str, Any] = {} 103 for config_index, (elem_index, elem) in enumerate(combo): 104 if isinstance(elem, dict): 105 result.update(elem) 106 else: 107 # keep non-dict values under an artificial name 108 result.setdefault(f"source_config_{config_index}", (elem_index, elem)) 109 return result 110 111 @property 112 def _stream_config(self) -> List[Dict[str, Any]]: 113 """ 114 Build every unique stream-configuration combination defined by 115 each ``StreamConfig`` and any ``default_values``. 116 """ 117 all_indexed_streams = [] 118 for stream_config in self.stream_configs: 119 path = [ 120 node.eval(self.config) if not isinstance(node, str) else node 121 for node in stream_config.configs_pointer 122 ] 123 stream_configs_raw = dpath.get(dict(self.config), path, default=[]) 124 stream_configs = ( 125 list(stream_configs_raw) 126 if isinstance(stream_configs_raw, list) 127 else [stream_configs_raw] 128 ) 129 130 if stream_config.default_values: 131 stream_configs.extend(stream_config.default_values) 132 133 all_indexed_streams.append([(i, item) for i, item in enumerate(stream_configs)]) 134 return [ 135 self._merge_combination(combo) # type: ignore[arg-type] 136 for combo in product(*all_indexed_streams) 137 ] 138 139 def resolve_components( 140 self, stream_template_config: Dict[str, Any] 141 ) -> Iterable[Dict[str, Any]]: 142 """ 143 Resolves components in the stream template configuration by populating values. 144 145 Args: 146 stream_template_config (Dict[str, Any]): Stream template to populate. 147 148 Yields: 149 Dict[str, Any]: Updated configurations with resolved components. 150 """ 151 kwargs = {"stream_template_config": stream_template_config} 152 153 for components_values in self._stream_config: 154 updated_config = deepcopy(stream_template_config) 155 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 156 157 for resolved_component in self._resolved_components: 158 valid_types = ( 159 (resolved_component.value_type,) if resolved_component.value_type else None 160 ) 161 value = resolved_component.value.eval( 162 self.config, valid_types=valid_types, **kwargs 163 ) 164 165 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 166 parsed_value = self._parse_yaml_if_possible(value) 167 updated = dpath.set(updated_config, path, parsed_value) 168 169 if parsed_value and not updated and resolved_component.create_or_update: 170 dpath.new(updated_config, path, parsed_value) 171 172 yield updated_config 173 174 @staticmethod 175 def _parse_yaml_if_possible(value: Any) -> Any: 176 """ 177 Try to turn value into a Python object by YAML-parsing it. 178 179 * If value is a `str` and can be parsed by `yaml.safe_load`, 180 return the parsed result. 181 * If parsing fails (`yaml.parser.ParserError`) – or value is not 182 a string at all – return the original value unchanged. 183 """ 184 if isinstance(value, str): 185 try: 186 return yaml.safe_load(value) 187 except ParserError: # "{{ record[0] in ['cohortActiveUsers'] }}" # not valid YAML 188 return value 189 return value
Resolves and populates stream templates with components fetched via source config.
Attributes:
- stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
- config (Config): Configuration object for the resolver.
- components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
- parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
139 def resolve_components( 140 self, stream_template_config: Dict[str, Any] 141 ) -> Iterable[Dict[str, Any]]: 142 """ 143 Resolves components in the stream template configuration by populating values. 144 145 Args: 146 stream_template_config (Dict[str, Any]): Stream template to populate. 147 148 Yields: 149 Dict[str, Any]: Updated configurations with resolved components. 150 """ 151 kwargs = {"stream_template_config": stream_template_config} 152 153 for components_values in self._stream_config: 154 updated_config = deepcopy(stream_template_config) 155 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 156 157 for resolved_component in self._resolved_components: 158 valid_types = ( 159 (resolved_component.value_type,) if resolved_component.value_type else None 160 ) 161 value = resolved_component.value.eval( 162 self.config, valid_types=valid_types, **kwargs 163 ) 164 165 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 166 parsed_value = self._parse_yaml_if_possible(value) 167 updated = dpath.set(updated_config, path, parsed_value) 168 169 if parsed_value and not updated and resolved_component.create_or_update: 170 dpath.new(updated_config, path, parsed_value) 171 172 yield updated_config
Resolves components in the stream template configuration by populating values.
Arguments:
- stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:
Dict[str, Any]: Updated configurations with resolved components.