airbyte_cdk.sources.declarative.resolvers
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5from typing import Mapping 6 7from pydantic.v1 import BaseModel 8 9from airbyte_cdk.sources.declarative.models import ( 10 ConfigComponentsResolver as ConfigComponentsResolverModel, 11) 12from airbyte_cdk.sources.declarative.models import ( 13 HttpComponentsResolver as HttpComponentsResolverModel, 14) 15from airbyte_cdk.sources.declarative.models import ( 16 ParametrizedComponentsResolver as ParametrizedComponentsResolverModel, 17) 18from airbyte_cdk.sources.declarative.resolvers.components_resolver import ( 19 ComponentMappingDefinition, 20 ComponentsResolver, 21 ResolvedComponentMappingDefinition, 22) 23from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import ( 24 ConfigComponentsResolver, 25 StreamConfig, 26) 27from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import ( 28 HttpComponentsResolver, 29) 30from airbyte_cdk.sources.declarative.resolvers.parametrized_components_resolver import ( 31 ParametrizedComponentsResolver, 32 StreamParametersDefinition, 33) 34 35COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = { 36 "HttpComponentsResolver": HttpComponentsResolverModel, 37 "ConfigComponentsResolver": ConfigComponentsResolverModel, 38 "ParametrizedComponentsResolver": ParametrizedComponentsResolverModel, 39} 40 41__all__ = [ 42 "ComponentsResolver", 43 "HttpComponentsResolver", 44 "ComponentMappingDefinition", 45 "ResolvedComponentMappingDefinition", 46 "StreamConfig", 47 "ConfigComponentsResolver", 48 "COMPONENTS_RESOLVER_TYPE_MAPPING", 49 "ParametrizedComponentsResolver", 50 "StreamParametersDefinition", 51]
45@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 46@dataclass 47class ComponentsResolver(ABC): 48 """ 49 Abstract base class for resolving components in a stream template. 50 """ 51 52 @abstractmethod 53 def resolve_components( 54 self, stream_template_config: Dict[str, Any] 55 ) -> Iterable[Dict[str, Any]]: 56 """ 57 Maps and populates values into a stream template configuration. 58 :param stream_template_config: The stream template with placeholders for components. 59 :yields: The resolved stream config with populated values. 60 """ 61 pass
Abstract base class for resolving components in a stream template.
52 @abstractmethod 53 def resolve_components( 54 self, stream_template_config: Dict[str, Any] 55 ) -> Iterable[Dict[str, Any]]: 56 """ 57 Maps and populates values into a stream template configuration. 58 :param stream_template_config: The stream template with placeholders for components. 59 :yields: The resolved stream config with populated values. 60 """ 61 pass
Maps and populates values into a stream template configuration.
Parameters
- stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
24@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 25@dataclass 26class HttpComponentsResolver(ComponentsResolver): 27 """ 28 Resolves and populates stream templates with components fetched via an HTTP retriever. 29 30 Attributes: 31 retriever (Retriever): The retriever used to fetch data from an API. 32 config (Config): Configuration object for the resolver. 33 components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. 34 parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. 35 """ 36 37 retriever: Retriever 38 config: Config 39 components_mapping: List[ComponentMappingDefinition] 40 parameters: InitVar[Mapping[str, Any]] 41 _resolved_components: List[ResolvedComponentMappingDefinition] = field( 42 init=False, repr=False, default_factory=list 43 ) 44 45 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 46 """ 47 Initializes and parses component mappings, converting them to resolved definitions. 48 49 Args: 50 parameters (Mapping[str, Any]): Parameters for interpolation. 51 """ 52 for component_mapping in self.components_mapping: 53 if isinstance(component_mapping.value, (str, InterpolatedString)): 54 interpolated_value = ( 55 InterpolatedString.create(component_mapping.value, parameters=parameters) 56 if isinstance(component_mapping.value, str) 57 else component_mapping.value 58 ) 59 60 field_path = [ 61 InterpolatedString.create(path, parameters=parameters) 62 for path in component_mapping.field_path 63 ] 64 65 self._resolved_components.append( 66 ResolvedComponentMappingDefinition( 67 field_path=field_path, 68 value=interpolated_value, 69 value_type=component_mapping.value_type, 70 parameters=parameters, 71 ) 72 ) 73 else: 74 raise ValueError( 75 f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" 76 ) 77 78 def resolve_components( 79 self, stream_template_config: Dict[str, Any] 80 ) -> Iterable[Dict[str, Any]]: 81 """ 82 Resolves components in the stream template configuration by populating values. 83 84 Args: 85 stream_template_config (Dict[str, Any]): Stream template to populate. 86 87 Yields: 88 Dict[str, Any]: Updated configurations with resolved components. 89 """ 90 kwargs = {"stream_template_config": stream_template_config} 91 92 for stream_slice in self.retriever.stream_slices(): 93 for components_values in self.retriever.read_records( 94 records_schema={}, stream_slice=stream_slice 95 ): 96 updated_config = deepcopy(stream_template_config) 97 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 98 kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] 99 100 for resolved_component in self._resolved_components: 101 valid_types = ( 102 (resolved_component.value_type,) if resolved_component.value_type else None 103 ) 104 value = resolved_component.value.eval( 105 self.config, valid_types=valid_types, **kwargs 106 ) 107 108 path = [ 109 path.eval(self.config, **kwargs) for path in resolved_component.field_path 110 ] 111 dpath.set(updated_config, path, value) 112 113 yield updated_config
Resolves and populates stream templates with components fetched via an HTTP retriever.
Attributes:
- retriever (Retriever): The retriever used to fetch data from an API.
- config (Config): Configuration object for the resolver.
- components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
- parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
78 def resolve_components( 79 self, stream_template_config: Dict[str, Any] 80 ) -> Iterable[Dict[str, Any]]: 81 """ 82 Resolves components in the stream template configuration by populating values. 83 84 Args: 85 stream_template_config (Dict[str, Any]): Stream template to populate. 86 87 Yields: 88 Dict[str, Any]: Updated configurations with resolved components. 89 """ 90 kwargs = {"stream_template_config": stream_template_config} 91 92 for stream_slice in self.retriever.stream_slices(): 93 for components_values in self.retriever.read_records( 94 records_schema={}, stream_slice=stream_slice 95 ): 96 updated_config = deepcopy(stream_template_config) 97 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 98 kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] 99 100 for resolved_component in self._resolved_components: 101 valid_types = ( 102 (resolved_component.value_type,) if resolved_component.value_type else None 103 ) 104 value = resolved_component.value.eval( 105 self.config, valid_types=valid_types, **kwargs 106 ) 107 108 path = [ 109 path.eval(self.config, **kwargs) for path in resolved_component.field_path 110 ] 111 dpath.set(updated_config, path, value) 112 113 yield updated_config
Resolves components in the stream template configuration by populating values.
Arguments:
- stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:
Dict[str, Any]: Updated configurations with resolved components.
17@dataclass(frozen=True) 18class ComponentMappingDefinition: 19 """Defines the configuration for mapping a component in a stream. This class specifies 20 what field in the stream template should be updated with value, supporting dynamic interpolation 21 and type enforcement.""" 22 23 field_path: List["InterpolatedString"] 24 value: Union["InterpolatedString", str] 25 value_type: Optional[Type[Any]] 26 parameters: InitVar[Mapping[str, Any]] 27 condition: Optional[str] = None 28 create_or_update: Optional[bool] = False
Defines the configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.
31@dataclass(frozen=True) 32class ResolvedComponentMappingDefinition: 33 """Defines resolved configuration for mapping a component in a stream. This class specifies 34 what field in the stream template should be updated with value, supporting dynamic interpolation 35 and type enforcement.""" 36 37 field_path: List["InterpolatedString"] 38 value: "InterpolatedString" 39 value_type: Optional[Type[Any]] 40 parameters: InitVar[Mapping[str, Any]] 41 condition: Optional[InterpolatedBoolean] = None 42 create_or_update: Optional[bool] = False
Defines resolved configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.
28@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 29@dataclass 30class StreamConfig: 31 """ 32 Identifies stream config details for dynamic schema extraction and processing. 33 """ 34 35 configs_pointer: List[Union[InterpolatedString, str]] 36 parameters: InitVar[Mapping[str, Any]] 37 default_values: Optional[List[Any]] = None 38 39 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 40 self.configs_pointer = [ 41 InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer 42 ]
Identifies stream config details for dynamic schema extraction and processing.
45@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 46@dataclass 47class ConfigComponentsResolver(ComponentsResolver): 48 """ 49 Resolves and populates stream templates with components fetched via source config. 50 51 Attributes: 52 stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config. 53 config (Config): Configuration object for the resolver. 54 components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. 55 parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. 56 """ 57 58 stream_configs: List[StreamConfig] 59 config: Config 60 components_mapping: List[ComponentMappingDefinition] 61 parameters: InitVar[Mapping[str, Any]] 62 _resolved_components: List[ResolvedComponentMappingDefinition] = field( 63 init=False, repr=False, default_factory=list 64 ) 65 66 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 67 """ 68 Initializes and parses component mappings, converting them to resolved definitions. 69 70 Args: 71 parameters (Mapping[str, Any]): Parameters for interpolation. 72 """ 73 74 for component_mapping in self.components_mapping: 75 interpolated_condition = ( 76 InterpolatedBoolean(condition=component_mapping.condition, parameters=parameters) 77 if component_mapping.condition 78 else None 79 ) 80 81 if isinstance(component_mapping.value, (str, InterpolatedString)): 82 interpolated_value = ( 83 InterpolatedString.create(component_mapping.value, parameters=parameters) 84 if isinstance(component_mapping.value, str) 85 else component_mapping.value 86 ) 87 88 field_path = [ 89 InterpolatedString.create(path, parameters=parameters) 90 for path in component_mapping.field_path 91 ] 92 93 self._resolved_components.append( 94 ResolvedComponentMappingDefinition( 95 field_path=field_path, 96 value=interpolated_value, 97 value_type=component_mapping.value_type, 98 create_or_update=component_mapping.create_or_update, 99 parameters=parameters, 100 condition=interpolated_condition, 101 ) 102 ) 103 else: 104 raise ValueError( 105 f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" 106 ) 107 108 @staticmethod 109 def _merge_combination(combo: Iterable[Tuple[int, Any]]) -> Dict[str, Any]: 110 """Collapse a combination of ``(idx, elem)`` into one config dict.""" 111 result: Dict[str, Any] = {} 112 for config_index, (elem_index, elem) in enumerate(combo): 113 if isinstance(elem, dict): 114 result.update(elem) 115 else: 116 # keep non-dict values under an artificial name 117 result.setdefault(f"source_config_{config_index}", (elem_index, elem)) 118 return result 119 120 @property 121 def _stream_config(self) -> List[Dict[str, Any]]: 122 """ 123 Build every unique stream-configuration combination defined by 124 each ``StreamConfig`` and any ``default_values``. 125 """ 126 all_indexed_streams = [] 127 for stream_config in self.stream_configs: 128 path = [ 129 node.eval(self.config) if not isinstance(node, str) else node 130 for node in stream_config.configs_pointer 131 ] 132 stream_configs_raw = dpath.get(dict(self.config), path, default=[]) 133 stream_configs = ( 134 list(stream_configs_raw) 135 if isinstance(stream_configs_raw, list) 136 else [stream_configs_raw] 137 ) 138 139 if stream_config.default_values: 140 stream_configs.extend(stream_config.default_values) 141 142 all_indexed_streams.append([(i, item) for i, item in enumerate(stream_configs)]) 143 return [ 144 self._merge_combination(combo) # type: ignore[arg-type] 145 for combo in product(*all_indexed_streams) 146 ] 147 148 def resolve_components( 149 self, stream_template_config: Dict[str, Any] 150 ) -> Iterable[Dict[str, Any]]: 151 """ 152 Resolves components in the stream template configuration by populating values. 153 154 Args: 155 stream_template_config (Dict[str, Any]): Stream template to populate. 156 157 Yields: 158 Dict[str, Any]: Updated configurations with resolved components. 159 """ 160 kwargs = {"stream_template_config": stream_template_config} 161 162 for components_values in self._stream_config: 163 updated_config = deepcopy(stream_template_config) 164 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 165 166 for resolved_component in self._resolved_components: 167 if ( 168 resolved_component.condition is not None 169 and not resolved_component.condition.eval(self.config, **kwargs) 170 ): 171 continue 172 173 valid_types = ( 174 (resolved_component.value_type,) if resolved_component.value_type else None 175 ) 176 value = resolved_component.value.eval( 177 self.config, valid_types=valid_types, **kwargs 178 ) 179 180 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 181 parsed_value = self._parse_yaml_if_possible(value) 182 updated = dpath.set(updated_config, path, parsed_value) 183 184 if parsed_value and not updated and resolved_component.create_or_update: 185 dpath.new(updated_config, path, parsed_value) 186 187 yield updated_config 188 189 @staticmethod 190 def _parse_yaml_if_possible(value: Any) -> Any: 191 """ 192 Try to turn value into a Python object by YAML-parsing it. 193 194 * If value is a `str` and can be parsed by `yaml.safe_load`, 195 return the parsed result. 196 * If parsing fails (`yaml.parser.ParserError`) – or value is not 197 a string at all – return the original value unchanged. 198 """ 199 if isinstance(value, str): 200 try: 201 return yaml.safe_load(value) 202 except ParserError: # "{{ record[0] in ['cohortActiveUsers'] }}" # not valid YAML 203 return value 204 except ScannerError as e: # "%Y-%m-%d' # not valid yaml 205 if "expected alphabetic or numeric character, but found '%'" in str(e): 206 return value 207 raise e 208 return value
Resolves and populates stream templates with components fetched via source config.
Attributes:
- stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
- config (Config): Configuration object for the resolver.
- components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
- parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
148 def resolve_components( 149 self, stream_template_config: Dict[str, Any] 150 ) -> Iterable[Dict[str, Any]]: 151 """ 152 Resolves components in the stream template configuration by populating values. 153 154 Args: 155 stream_template_config (Dict[str, Any]): Stream template to populate. 156 157 Yields: 158 Dict[str, Any]: Updated configurations with resolved components. 159 """ 160 kwargs = {"stream_template_config": stream_template_config} 161 162 for components_values in self._stream_config: 163 updated_config = deepcopy(stream_template_config) 164 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 165 166 for resolved_component in self._resolved_components: 167 if ( 168 resolved_component.condition is not None 169 and not resolved_component.condition.eval(self.config, **kwargs) 170 ): 171 continue 172 173 valid_types = ( 174 (resolved_component.value_type,) if resolved_component.value_type else None 175 ) 176 value = resolved_component.value.eval( 177 self.config, valid_types=valid_types, **kwargs 178 ) 179 180 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 181 parsed_value = self._parse_yaml_if_possible(value) 182 updated = dpath.set(updated_config, path, parsed_value) 183 184 if parsed_value and not updated and resolved_component.create_or_update: 185 dpath.new(updated_config, path, parsed_value) 186 187 yield updated_config
Resolves components in the stream template configuration by populating values.
Arguments:
- stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:
Dict[str, Any]: Updated configurations with resolved components.
35@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 36@dataclass 37class ParametrizedComponentsResolver(ComponentsResolver): 38 """ 39 Resolves and populates dynamic streams from defined parametrized values in manifest. 40 """ 41 42 stream_parameters: StreamParametersDefinition 43 config: Config 44 components_mapping: List[ComponentMappingDefinition] 45 parameters: InitVar[Mapping[str, Any]] 46 _resolved_components: List[ResolvedComponentMappingDefinition] = field( 47 init=False, repr=False, default_factory=list 48 ) 49 50 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 51 """ 52 Initializes and parses component mappings, converting them to resolved definitions. 53 54 Args: 55 parameters (Mapping[str, Any]): Parameters for interpolation. 56 """ 57 58 for component_mapping in self.components_mapping: 59 if isinstance(component_mapping.value, (str, InterpolatedString)): 60 interpolated_value = ( 61 InterpolatedString.create(component_mapping.value, parameters=parameters) 62 if isinstance(component_mapping.value, str) 63 else component_mapping.value 64 ) 65 66 field_path = [ 67 InterpolatedString.create(path, parameters=parameters) 68 for path in component_mapping.field_path 69 ] 70 71 self._resolved_components.append( 72 ResolvedComponentMappingDefinition( 73 field_path=field_path, 74 value=interpolated_value, 75 value_type=component_mapping.value_type, 76 create_or_update=component_mapping.create_or_update, 77 parameters=parameters, 78 ) 79 ) 80 else: 81 raise ValueError( 82 f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" 83 ) 84 85 def resolve_components( 86 self, stream_template_config: Dict[str, Any] 87 ) -> Iterable[Dict[str, Any]]: 88 kwargs = {"stream_template_config": stream_template_config} 89 90 for components_values in self.stream_parameters.list_of_parameters_for_stream: 91 updated_config = deepcopy(stream_template_config) 92 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 93 for resolved_component in self._resolved_components: 94 valid_types = ( 95 (resolved_component.value_type,) if resolved_component.value_type else None 96 ) 97 value = resolved_component.value.eval( 98 self.config, valid_types=valid_types, **kwargs 99 ) 100 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 101 parsed_value = self._parse_yaml_if_possible(value) 102 # https://github.com/dpath-maintainers/dpath-python/blob/master/dpath/__init__.py#L136 103 # dpath.set returns the number of changed elements, 0 when no elements changed 104 updated = dpath.set(updated_config, path, parsed_value) 105 106 if parsed_value and not updated and resolved_component.create_or_update: 107 dpath.new(updated_config, path, parsed_value) 108 109 yield updated_config 110 111 @staticmethod 112 def _parse_yaml_if_possible(value: Any) -> Any: 113 """ 114 Try to turn value into a Python object by YAML-parsing it. 115 116 * If value is a `str` and can be parsed by `yaml.safe_load`, 117 return the parsed result. 118 * If parsing fails (`yaml.parser.ParserError`) – or value is not 119 a string at all – return the original value unchanged. 120 """ 121 if isinstance(value, str): 122 try: 123 return yaml.safe_load(value) 124 except ParserError: # "{{ record[0] in ['cohortActiveUsers'] }}" # not valid YAML 125 return value 126 return value
Resolves and populates dynamic streams from defined parametrized values in manifest.
85 def resolve_components( 86 self, stream_template_config: Dict[str, Any] 87 ) -> Iterable[Dict[str, Any]]: 88 kwargs = {"stream_template_config": stream_template_config} 89 90 for components_values in self.stream_parameters.list_of_parameters_for_stream: 91 updated_config = deepcopy(stream_template_config) 92 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 93 for resolved_component in self._resolved_components: 94 valid_types = ( 95 (resolved_component.value_type,) if resolved_component.value_type else None 96 ) 97 value = resolved_component.value.eval( 98 self.config, valid_types=valid_types, **kwargs 99 ) 100 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 101 parsed_value = self._parse_yaml_if_possible(value) 102 # https://github.com/dpath-maintainers/dpath-python/blob/master/dpath/__init__.py#L136 103 # dpath.set returns the number of changed elements, 0 when no elements changed 104 updated = dpath.set(updated_config, path, parsed_value) 105 106 if parsed_value and not updated and resolved_component.create_or_update: 107 dpath.new(updated_config, path, parsed_value) 108 109 yield updated_config
Maps and populates values into a stream template configuration.
Parameters
- stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
25@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 26@dataclass 27class StreamParametersDefinition: 28 """ 29 Represents a stream parameters definition to set up dynamic streams from defined values in manifest. 30 """ 31 32 list_of_parameters_for_stream: List[Dict[str, Any]]
Represents a stream parameters definition to set up dynamic streams from defined values in manifest.