airbyte_cdk.sources.declarative.resolvers
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5from typing import Mapping 6 7from pydantic.v1 import BaseModel 8 9from airbyte_cdk.sources.declarative.models import ( 10 ConfigComponentsResolver as ConfigComponentsResolverModel, 11) 12from airbyte_cdk.sources.declarative.models import ( 13 HttpComponentsResolver as HttpComponentsResolverModel, 14) 15from airbyte_cdk.sources.declarative.models import ( 16 ParametrizedComponentsResolver as ParametrizedComponentsResolverModel, 17) 18from airbyte_cdk.sources.declarative.resolvers.components_resolver import ( 19 ComponentMappingDefinition, 20 ComponentsResolver, 21 ResolvedComponentMappingDefinition, 22) 23from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import ( 24 ConfigComponentsResolver, 25 StreamConfig, 26) 27from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import ( 28 HttpComponentsResolver, 29) 30from airbyte_cdk.sources.declarative.resolvers.parametrized_components_resolver import ( 31 ParametrizedComponentsResolver, 32 StreamParametersDefinition, 33) 34 35COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = { 36 "HttpComponentsResolver": HttpComponentsResolverModel, 37 "ConfigComponentsResolver": ConfigComponentsResolverModel, 38 "ParametrizedComponentsResolver": ParametrizedComponentsResolverModel, 39} 40 41__all__ = [ 42 "ComponentsResolver", 43 "HttpComponentsResolver", 44 "ComponentMappingDefinition", 45 "ResolvedComponentMappingDefinition", 46 "StreamConfig", 47 "ConfigComponentsResolver", 48 "COMPONENTS_RESOLVER_TYPE_MAPPING", 49 "ParametrizedComponentsResolver", 50 "StreamParametersDefinition", 51]
42@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 43@dataclass 44class ComponentsResolver(ABC): 45 """ 46 Abstract base class for resolving components in a stream template. 47 """ 48 49 @abstractmethod 50 def resolve_components( 51 self, stream_template_config: Dict[str, Any] 52 ) -> Iterable[Dict[str, Any]]: 53 """ 54 Maps and populates values into a stream template configuration. 55 :param stream_template_config: The stream template with placeholders for components. 56 :yields: The resolved stream config with populated values. 57 """ 58 pass
Abstract base class for resolving components in a stream template.
49 @abstractmethod 50 def resolve_components( 51 self, stream_template_config: Dict[str, Any] 52 ) -> Iterable[Dict[str, Any]]: 53 """ 54 Maps and populates values into a stream template configuration. 55 :param stream_template_config: The stream template with placeholders for components. 56 :yields: The resolved stream config with populated values. 57 """ 58 pass
Maps and populates values into a stream template configuration.
Parameters
- stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
24@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 25@dataclass 26class HttpComponentsResolver(ComponentsResolver): 27 """ 28 Resolves and populates stream templates with components fetched via an HTTP retriever. 29 30 Attributes: 31 retriever (Retriever): The retriever used to fetch data from an API. 32 config (Config): Configuration object for the resolver. 33 components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. 34 parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. 35 """ 36 37 retriever: Retriever 38 config: Config 39 components_mapping: List[ComponentMappingDefinition] 40 parameters: InitVar[Mapping[str, Any]] 41 _resolved_components: List[ResolvedComponentMappingDefinition] = field( 42 init=False, repr=False, default_factory=list 43 ) 44 45 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 46 """ 47 Initializes and parses component mappings, converting them to resolved definitions. 48 49 Args: 50 parameters (Mapping[str, Any]): Parameters for interpolation. 51 """ 52 for component_mapping in self.components_mapping: 53 if isinstance(component_mapping.value, (str, InterpolatedString)): 54 interpolated_value = ( 55 InterpolatedString.create(component_mapping.value, parameters=parameters) 56 if isinstance(component_mapping.value, str) 57 else component_mapping.value 58 ) 59 60 field_path = [ 61 InterpolatedString.create(path, parameters=parameters) 62 for path in component_mapping.field_path 63 ] 64 65 self._resolved_components.append( 66 ResolvedComponentMappingDefinition( 67 field_path=field_path, 68 value=interpolated_value, 69 value_type=component_mapping.value_type, 70 parameters=parameters, 71 ) 72 ) 73 else: 74 raise ValueError( 75 f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" 76 ) 77 78 def resolve_components( 79 self, stream_template_config: Dict[str, Any] 80 ) -> Iterable[Dict[str, Any]]: 81 """ 82 Resolves components in the stream template configuration by populating values. 83 84 Args: 85 stream_template_config (Dict[str, Any]): Stream template to populate. 86 87 Yields: 88 Dict[str, Any]: Updated configurations with resolved components. 89 """ 90 kwargs = {"stream_template_config": stream_template_config} 91 92 for stream_slice in self.retriever.stream_slices(): 93 for components_values in self.retriever.read_records( 94 records_schema={}, stream_slice=stream_slice 95 ): 96 updated_config = deepcopy(stream_template_config) 97 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 98 kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] 99 100 for resolved_component in self._resolved_components: 101 valid_types = ( 102 (resolved_component.value_type,) if resolved_component.value_type else None 103 ) 104 value = resolved_component.value.eval( 105 self.config, valid_types=valid_types, **kwargs 106 ) 107 108 path = [ 109 path.eval(self.config, **kwargs) for path in resolved_component.field_path 110 ] 111 dpath.set(updated_config, path, value) 112 113 yield updated_config
Resolves and populates stream templates with components fetched via an HTTP retriever.
Attributes:
- retriever (Retriever): The retriever used to fetch data from an API.
- config (Config): Configuration object for the resolver.
- components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
- parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
78 def resolve_components( 79 self, stream_template_config: Dict[str, Any] 80 ) -> Iterable[Dict[str, Any]]: 81 """ 82 Resolves components in the stream template configuration by populating values. 83 84 Args: 85 stream_template_config (Dict[str, Any]): Stream template to populate. 86 87 Yields: 88 Dict[str, Any]: Updated configurations with resolved components. 89 """ 90 kwargs = {"stream_template_config": stream_template_config} 91 92 for stream_slice in self.retriever.stream_slices(): 93 for components_values in self.retriever.read_records( 94 records_schema={}, stream_slice=stream_slice 95 ): 96 updated_config = deepcopy(stream_template_config) 97 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 98 kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] 99 100 for resolved_component in self._resolved_components: 101 valid_types = ( 102 (resolved_component.value_type,) if resolved_component.value_type else None 103 ) 104 value = resolved_component.value.eval( 105 self.config, valid_types=valid_types, **kwargs 106 ) 107 108 path = [ 109 path.eval(self.config, **kwargs) for path in resolved_component.field_path 110 ] 111 dpath.set(updated_config, path, value) 112 113 yield updated_config
Resolves components in the stream template configuration by populating values.
Arguments:
- stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:
Dict[str, Any]: Updated configurations with resolved components.
16@dataclass(frozen=True) 17class ComponentMappingDefinition: 18 """Defines the configuration for mapping a component in a stream. This class specifies 19 what field in the stream template should be updated with value, supporting dynamic interpolation 20 and type enforcement.""" 21 22 field_path: List["InterpolatedString"] 23 value: Union["InterpolatedString", str] 24 value_type: Optional[Type[Any]] 25 parameters: InitVar[Mapping[str, Any]] 26 create_or_update: Optional[bool] = False
Defines the configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.
29@dataclass(frozen=True) 30class ResolvedComponentMappingDefinition: 31 """Defines resolved configuration for mapping a component in a stream. This class specifies 32 what field in the stream template should be updated with value, supporting dynamic interpolation 33 and type enforcement.""" 34 35 field_path: List["InterpolatedString"] 36 value: "InterpolatedString" 37 value_type: Optional[Type[Any]] 38 parameters: InitVar[Mapping[str, Any]] 39 create_or_update: Optional[bool] = False
Defines resolved configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.
27@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 28@dataclass 29class StreamConfig: 30 """ 31 Identifies stream config details for dynamic schema extraction and processing. 32 """ 33 34 configs_pointer: List[Union[InterpolatedString, str]] 35 parameters: InitVar[Mapping[str, Any]] 36 default_values: Optional[List[Any]] = None 37 38 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 39 self.configs_pointer = [ 40 InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer 41 ]
Identifies stream config details for dynamic schema extraction and processing.
44@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 45@dataclass 46class ConfigComponentsResolver(ComponentsResolver): 47 """ 48 Resolves and populates stream templates with components fetched via source config. 49 50 Attributes: 51 stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config. 52 config (Config): Configuration object for the resolver. 53 components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. 54 parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. 55 """ 56 57 stream_configs: List[StreamConfig] 58 config: Config 59 components_mapping: List[ComponentMappingDefinition] 60 parameters: InitVar[Mapping[str, Any]] 61 _resolved_components: List[ResolvedComponentMappingDefinition] = field( 62 init=False, repr=False, default_factory=list 63 ) 64 65 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 66 """ 67 Initializes and parses component mappings, converting them to resolved definitions. 68 69 Args: 70 parameters (Mapping[str, Any]): Parameters for interpolation. 71 """ 72 73 for component_mapping in self.components_mapping: 74 if isinstance(component_mapping.value, (str, InterpolatedString)): 75 interpolated_value = ( 76 InterpolatedString.create(component_mapping.value, parameters=parameters) 77 if isinstance(component_mapping.value, str) 78 else component_mapping.value 79 ) 80 81 field_path = [ 82 InterpolatedString.create(path, parameters=parameters) 83 for path in component_mapping.field_path 84 ] 85 86 self._resolved_components.append( 87 ResolvedComponentMappingDefinition( 88 field_path=field_path, 89 value=interpolated_value, 90 value_type=component_mapping.value_type, 91 create_or_update=component_mapping.create_or_update, 92 parameters=parameters, 93 ) 94 ) 95 else: 96 raise ValueError( 97 f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" 98 ) 99 100 @staticmethod 101 def _merge_combination(combo: Iterable[Tuple[int, Any]]) -> Dict[str, Any]: 102 """Collapse a combination of ``(idx, elem)`` into one config dict.""" 103 result: Dict[str, Any] = {} 104 for config_index, (elem_index, elem) in enumerate(combo): 105 if isinstance(elem, dict): 106 result.update(elem) 107 else: 108 # keep non-dict values under an artificial name 109 result.setdefault(f"source_config_{config_index}", (elem_index, elem)) 110 return result 111 112 @property 113 def _stream_config(self) -> List[Dict[str, Any]]: 114 """ 115 Build every unique stream-configuration combination defined by 116 each ``StreamConfig`` and any ``default_values``. 117 """ 118 all_indexed_streams = [] 119 for stream_config in self.stream_configs: 120 path = [ 121 node.eval(self.config) if not isinstance(node, str) else node 122 for node in stream_config.configs_pointer 123 ] 124 stream_configs_raw = dpath.get(dict(self.config), path, default=[]) 125 stream_configs = ( 126 list(stream_configs_raw) 127 if isinstance(stream_configs_raw, list) 128 else [stream_configs_raw] 129 ) 130 131 if stream_config.default_values: 132 stream_configs.extend(stream_config.default_values) 133 134 all_indexed_streams.append([(i, item) for i, item in enumerate(stream_configs)]) 135 return [ 136 self._merge_combination(combo) # type: ignore[arg-type] 137 for combo in product(*all_indexed_streams) 138 ] 139 140 def resolve_components( 141 self, stream_template_config: Dict[str, Any] 142 ) -> Iterable[Dict[str, Any]]: 143 """ 144 Resolves components in the stream template configuration by populating values. 145 146 Args: 147 stream_template_config (Dict[str, Any]): Stream template to populate. 148 149 Yields: 150 Dict[str, Any]: Updated configurations with resolved components. 151 """ 152 kwargs = {"stream_template_config": stream_template_config} 153 154 for components_values in self._stream_config: 155 updated_config = deepcopy(stream_template_config) 156 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 157 158 for resolved_component in self._resolved_components: 159 valid_types = ( 160 (resolved_component.value_type,) if resolved_component.value_type else None 161 ) 162 value = resolved_component.value.eval( 163 self.config, valid_types=valid_types, **kwargs 164 ) 165 166 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 167 parsed_value = self._parse_yaml_if_possible(value) 168 updated = dpath.set(updated_config, path, parsed_value) 169 170 if parsed_value and not updated and resolved_component.create_or_update: 171 dpath.new(updated_config, path, parsed_value) 172 173 yield updated_config 174 175 @staticmethod 176 def _parse_yaml_if_possible(value: Any) -> Any: 177 """ 178 Try to turn value into a Python object by YAML-parsing it. 179 180 * If value is a `str` and can be parsed by `yaml.safe_load`, 181 return the parsed result. 182 * If parsing fails (`yaml.parser.ParserError`) – or value is not 183 a string at all – return the original value unchanged. 184 """ 185 if isinstance(value, str): 186 try: 187 return yaml.safe_load(value) 188 except ParserError: # "{{ record[0] in ['cohortActiveUsers'] }}" # not valid YAML 189 return value 190 except ScannerError as e: # "%Y-%m-%d' # not valid yaml 191 if "expected alphabetic or numeric character, but found '%'" in str(e): 192 return value 193 raise e 194 return value
Resolves and populates stream templates with components fetched via source config.
Attributes:
- stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
- config (Config): Configuration object for the resolver.
- components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
- parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
140 def resolve_components( 141 self, stream_template_config: Dict[str, Any] 142 ) -> Iterable[Dict[str, Any]]: 143 """ 144 Resolves components in the stream template configuration by populating values. 145 146 Args: 147 stream_template_config (Dict[str, Any]): Stream template to populate. 148 149 Yields: 150 Dict[str, Any]: Updated configurations with resolved components. 151 """ 152 kwargs = {"stream_template_config": stream_template_config} 153 154 for components_values in self._stream_config: 155 updated_config = deepcopy(stream_template_config) 156 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 157 158 for resolved_component in self._resolved_components: 159 valid_types = ( 160 (resolved_component.value_type,) if resolved_component.value_type else None 161 ) 162 value = resolved_component.value.eval( 163 self.config, valid_types=valid_types, **kwargs 164 ) 165 166 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 167 parsed_value = self._parse_yaml_if_possible(value) 168 updated = dpath.set(updated_config, path, parsed_value) 169 170 if parsed_value and not updated and resolved_component.create_or_update: 171 dpath.new(updated_config, path, parsed_value) 172 173 yield updated_config
Resolves components in the stream template configuration by populating values.
Arguments:
- stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:
Dict[str, Any]: Updated configurations with resolved components.
35@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 36@dataclass 37class ParametrizedComponentsResolver(ComponentsResolver): 38 """ 39 Resolves and populates dynamic streams from defined parametrized values in manifest. 40 """ 41 42 stream_parameters: StreamParametersDefinition 43 config: Config 44 components_mapping: List[ComponentMappingDefinition] 45 parameters: InitVar[Mapping[str, Any]] 46 _resolved_components: List[ResolvedComponentMappingDefinition] = field( 47 init=False, repr=False, default_factory=list 48 ) 49 50 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 51 """ 52 Initializes and parses component mappings, converting them to resolved definitions. 53 54 Args: 55 parameters (Mapping[str, Any]): Parameters for interpolation. 56 """ 57 58 for component_mapping in self.components_mapping: 59 if isinstance(component_mapping.value, (str, InterpolatedString)): 60 interpolated_value = ( 61 InterpolatedString.create(component_mapping.value, parameters=parameters) 62 if isinstance(component_mapping.value, str) 63 else component_mapping.value 64 ) 65 66 field_path = [ 67 InterpolatedString.create(path, parameters=parameters) 68 for path in component_mapping.field_path 69 ] 70 71 self._resolved_components.append( 72 ResolvedComponentMappingDefinition( 73 field_path=field_path, 74 value=interpolated_value, 75 value_type=component_mapping.value_type, 76 create_or_update=component_mapping.create_or_update, 77 parameters=parameters, 78 ) 79 ) 80 else: 81 raise ValueError( 82 f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" 83 ) 84 85 def resolve_components( 86 self, stream_template_config: Dict[str, Any] 87 ) -> Iterable[Dict[str, Any]]: 88 kwargs = {"stream_template_config": stream_template_config} 89 90 for components_values in self.stream_parameters.list_of_parameters_for_stream: 91 updated_config = deepcopy(stream_template_config) 92 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 93 for resolved_component in self._resolved_components: 94 valid_types = ( 95 (resolved_component.value_type,) if resolved_component.value_type else None 96 ) 97 value = resolved_component.value.eval( 98 self.config, valid_types=valid_types, **kwargs 99 ) 100 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 101 parsed_value = self._parse_yaml_if_possible(value) 102 # https://github.com/dpath-maintainers/dpath-python/blob/master/dpath/__init__.py#L136 103 # dpath.set returns the number of changed elements, 0 when no elements changed 104 updated = dpath.set(updated_config, path, parsed_value) 105 106 if parsed_value and not updated and resolved_component.create_or_update: 107 dpath.new(updated_config, path, parsed_value) 108 109 yield updated_config 110 111 @staticmethod 112 def _parse_yaml_if_possible(value: Any) -> Any: 113 """ 114 Try to turn value into a Python object by YAML-parsing it. 115 116 * If value is a `str` and can be parsed by `yaml.safe_load`, 117 return the parsed result. 118 * If parsing fails (`yaml.parser.ParserError`) – or value is not 119 a string at all – return the original value unchanged. 120 """ 121 if isinstance(value, str): 122 try: 123 return yaml.safe_load(value) 124 except ParserError: # "{{ record[0] in ['cohortActiveUsers'] }}" # not valid YAML 125 return value 126 return value
Resolves and populates dynamic streams from defined parametrized values in manifest.
85 def resolve_components( 86 self, stream_template_config: Dict[str, Any] 87 ) -> Iterable[Dict[str, Any]]: 88 kwargs = {"stream_template_config": stream_template_config} 89 90 for components_values in self.stream_parameters.list_of_parameters_for_stream: 91 updated_config = deepcopy(stream_template_config) 92 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 93 for resolved_component in self._resolved_components: 94 valid_types = ( 95 (resolved_component.value_type,) if resolved_component.value_type else None 96 ) 97 value = resolved_component.value.eval( 98 self.config, valid_types=valid_types, **kwargs 99 ) 100 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 101 parsed_value = self._parse_yaml_if_possible(value) 102 # https://github.com/dpath-maintainers/dpath-python/blob/master/dpath/__init__.py#L136 103 # dpath.set returns the number of changed elements, 0 when no elements changed 104 updated = dpath.set(updated_config, path, parsed_value) 105 106 if parsed_value and not updated and resolved_component.create_or_update: 107 dpath.new(updated_config, path, parsed_value) 108 109 yield updated_config
Maps and populates values into a stream template configuration.
Parameters
- stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
25@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 26@dataclass 27class StreamParametersDefinition: 28 """ 29 Represents a stream parameters definition to set up dynamic streams from defined values in manifest. 30 """ 31 32 list_of_parameters_for_stream: List[Dict[str, Any]]
Represents a stream parameters definition to set up dynamic streams from defined values in manifest.