airbyte_cdk.sources.declarative.resolvers
1# 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 3# 4 5from typing import Mapping 6 7from pydantic.v1 import BaseModel 8 9from airbyte_cdk.sources.declarative.models import ( 10 ConfigComponentsResolver as ConfigComponentsResolverModel, 11) 12from airbyte_cdk.sources.declarative.models import ( 13 HttpComponentsResolver as HttpComponentsResolverModel, 14) 15from airbyte_cdk.sources.declarative.resolvers.components_resolver import ( 16 ComponentMappingDefinition, 17 ComponentsResolver, 18 ResolvedComponentMappingDefinition, 19) 20from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import ( 21 ConfigComponentsResolver, 22 StreamConfig, 23) 24from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import ( 25 HttpComponentsResolver, 26) 27 28COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = { 29 "HttpComponentsResolver": HttpComponentsResolverModel, 30 "ConfigComponentsResolver": ConfigComponentsResolverModel, 31} 32 33__all__ = [ 34 "ComponentsResolver", 35 "HttpComponentsResolver", 36 "ComponentMappingDefinition", 37 "ResolvedComponentMappingDefinition", 38 "StreamConfig", 39 "ConfigComponentsResolver", 40 "COMPONENTS_RESOLVER_TYPE_MAPPING", 41]
40@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 41@dataclass 42class ComponentsResolver(ABC): 43 """ 44 Abstract base class for resolving components in a stream template. 45 """ 46 47 @abstractmethod 48 def resolve_components( 49 self, stream_template_config: Dict[str, Any] 50 ) -> Iterable[Dict[str, Any]]: 51 """ 52 Maps and populates values into a stream template configuration. 53 :param stream_template_config: The stream template with placeholders for components. 54 :yields: The resolved stream config with populated values. 55 """ 56 pass
Abstract base class for resolving components in a stream template.
47 @abstractmethod 48 def resolve_components( 49 self, stream_template_config: Dict[str, Any] 50 ) -> Iterable[Dict[str, Any]]: 51 """ 52 Maps and populates values into a stream template configuration. 53 :param stream_template_config: The stream template with placeholders for components. 54 :yields: The resolved stream config with populated values. 55 """ 56 pass
Maps and populates values into a stream template configuration.
Parameters
- stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
24@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 25@dataclass 26class HttpComponentsResolver(ComponentsResolver): 27 """ 28 Resolves and populates stream templates with components fetched via an HTTP retriever. 29 30 Attributes: 31 retriever (Retriever): The retriever used to fetch data from an API. 32 config (Config): Configuration object for the resolver. 33 components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. 34 parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. 35 """ 36 37 retriever: Retriever 38 config: Config 39 components_mapping: List[ComponentMappingDefinition] 40 parameters: InitVar[Mapping[str, Any]] 41 _resolved_components: List[ResolvedComponentMappingDefinition] = field( 42 init=False, repr=False, default_factory=list 43 ) 44 45 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 46 """ 47 Initializes and parses component mappings, converting them to resolved definitions. 48 49 Args: 50 parameters (Mapping[str, Any]): Parameters for interpolation. 51 """ 52 for component_mapping in self.components_mapping: 53 if isinstance(component_mapping.value, (str, InterpolatedString)): 54 interpolated_value = ( 55 InterpolatedString.create(component_mapping.value, parameters=parameters) 56 if isinstance(component_mapping.value, str) 57 else component_mapping.value 58 ) 59 60 field_path = [ 61 InterpolatedString.create(path, parameters=parameters) 62 for path in component_mapping.field_path 63 ] 64 65 self._resolved_components.append( 66 ResolvedComponentMappingDefinition( 67 field_path=field_path, 68 value=interpolated_value, 69 value_type=component_mapping.value_type, 70 parameters=parameters, 71 ) 72 ) 73 else: 74 raise ValueError( 75 f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" 76 ) 77 78 def resolve_components( 79 self, stream_template_config: Dict[str, Any] 80 ) -> Iterable[Dict[str, Any]]: 81 """ 82 Resolves components in the stream template configuration by populating values. 83 84 Args: 85 stream_template_config (Dict[str, Any]): Stream template to populate. 86 87 Yields: 88 Dict[str, Any]: Updated configurations with resolved components. 89 """ 90 kwargs = {"stream_template_config": stream_template_config} 91 92 for stream_slice in self.retriever.stream_slices(): 93 for components_values in self.retriever.read_records( 94 records_schema={}, stream_slice=stream_slice 95 ): 96 updated_config = deepcopy(stream_template_config) 97 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 98 kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] 99 100 for resolved_component in self._resolved_components: 101 valid_types = ( 102 (resolved_component.value_type,) if resolved_component.value_type else None 103 ) 104 value = resolved_component.value.eval( 105 self.config, valid_types=valid_types, **kwargs 106 ) 107 108 path = [ 109 path.eval(self.config, **kwargs) for path in resolved_component.field_path 110 ] 111 dpath.set(updated_config, path, value) 112 113 yield updated_config
Resolves and populates stream templates with components fetched via an HTTP retriever.
Attributes:
- retriever (Retriever): The retriever used to fetch data from an API.
- config (Config): Configuration object for the resolver.
- components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
- parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
78 def resolve_components( 79 self, stream_template_config: Dict[str, Any] 80 ) -> Iterable[Dict[str, Any]]: 81 """ 82 Resolves components in the stream template configuration by populating values. 83 84 Args: 85 stream_template_config (Dict[str, Any]): Stream template to populate. 86 87 Yields: 88 Dict[str, Any]: Updated configurations with resolved components. 89 """ 90 kwargs = {"stream_template_config": stream_template_config} 91 92 for stream_slice in self.retriever.stream_slices(): 93 for components_values in self.retriever.read_records( 94 records_schema={}, stream_slice=stream_slice 95 ): 96 updated_config = deepcopy(stream_template_config) 97 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 98 kwargs["stream_slice"] = stream_slice # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any] 99 100 for resolved_component in self._resolved_components: 101 valid_types = ( 102 (resolved_component.value_type,) if resolved_component.value_type else None 103 ) 104 value = resolved_component.value.eval( 105 self.config, valid_types=valid_types, **kwargs 106 ) 107 108 path = [ 109 path.eval(self.config, **kwargs) for path in resolved_component.field_path 110 ] 111 dpath.set(updated_config, path, value) 112 113 yield updated_config
Resolves components in the stream template configuration by populating values.
Arguments:
- stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:
Dict[str, Any]: Updated configurations with resolved components.
16@dataclass(frozen=True) 17class ComponentMappingDefinition: 18 """Defines the configuration for mapping a component in a stream. This class specifies 19 what field in the stream template should be updated with value, supporting dynamic interpolation 20 and type enforcement.""" 21 22 field_path: List["InterpolatedString"] 23 value: Union["InterpolatedString", str] 24 value_type: Optional[Type[Any]] 25 parameters: InitVar[Mapping[str, Any]]
Defines the configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.
28@dataclass(frozen=True) 29class ResolvedComponentMappingDefinition: 30 """Defines resolved configuration for mapping a component in a stream. This class specifies 31 what field in the stream template should be updated with value, supporting dynamic interpolation 32 and type enforcement.""" 33 34 field_path: List["InterpolatedString"] 35 value: "InterpolatedString" 36 value_type: Optional[Type[Any]] 37 parameters: InitVar[Mapping[str, Any]]
Defines resolved configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.
23@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 24@dataclass 25class StreamConfig: 26 """ 27 Identifies stream config details for dynamic schema extraction and processing. 28 """ 29 30 configs_pointer: List[Union[InterpolatedString, str]] 31 parameters: InitVar[Mapping[str, Any]] 32 33 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 34 self.configs_pointer = [ 35 InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer 36 ]
Identifies stream config details for dynamic schema extraction and processing.
39@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning) 40@dataclass 41class ConfigComponentsResolver(ComponentsResolver): 42 """ 43 Resolves and populates stream templates with components fetched via source config. 44 45 Attributes: 46 stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config. 47 config (Config): Configuration object for the resolver. 48 components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve. 49 parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation. 50 """ 51 52 stream_config: StreamConfig 53 config: Config 54 components_mapping: List[ComponentMappingDefinition] 55 parameters: InitVar[Mapping[str, Any]] 56 _resolved_components: List[ResolvedComponentMappingDefinition] = field( 57 init=False, repr=False, default_factory=list 58 ) 59 60 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 61 """ 62 Initializes and parses component mappings, converting them to resolved definitions. 63 64 Args: 65 parameters (Mapping[str, Any]): Parameters for interpolation. 66 """ 67 68 for component_mapping in self.components_mapping: 69 if isinstance(component_mapping.value, (str, InterpolatedString)): 70 interpolated_value = ( 71 InterpolatedString.create(component_mapping.value, parameters=parameters) 72 if isinstance(component_mapping.value, str) 73 else component_mapping.value 74 ) 75 76 field_path = [ 77 InterpolatedString.create(path, parameters=parameters) 78 for path in component_mapping.field_path 79 ] 80 81 self._resolved_components.append( 82 ResolvedComponentMappingDefinition( 83 field_path=field_path, 84 value=interpolated_value, 85 value_type=component_mapping.value_type, 86 parameters=parameters, 87 ) 88 ) 89 else: 90 raise ValueError( 91 f"Expected a string or InterpolatedString for value in mapping: {component_mapping}" 92 ) 93 94 @property 95 def _stream_config(self) -> Iterable[Mapping[str, Any]]: 96 path = [ 97 node.eval(self.config) if not isinstance(node, str) else node 98 for node in self.stream_config.configs_pointer 99 ] 100 stream_config = dpath.get(dict(self.config), path, default=[]) 101 102 if not isinstance(stream_config, list): 103 stream_config = [stream_config] 104 105 return stream_config 106 107 def resolve_components( 108 self, stream_template_config: Dict[str, Any] 109 ) -> Iterable[Dict[str, Any]]: 110 """ 111 Resolves components in the stream template configuration by populating values. 112 113 Args: 114 stream_template_config (Dict[str, Any]): Stream template to populate. 115 116 Yields: 117 Dict[str, Any]: Updated configurations with resolved components. 118 """ 119 kwargs = {"stream_template_config": stream_template_config} 120 121 for components_values in self._stream_config: 122 updated_config = deepcopy(stream_template_config) 123 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 124 125 for resolved_component in self._resolved_components: 126 valid_types = ( 127 (resolved_component.value_type,) if resolved_component.value_type else None 128 ) 129 value = resolved_component.value.eval( 130 self.config, valid_types=valid_types, **kwargs 131 ) 132 133 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 134 135 dpath.set(updated_config, path, value) 136 137 yield updated_config
Resolves and populates stream templates with components fetched via source config.
Attributes:
- stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
- config (Config): Configuration object for the resolver.
- components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
- parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
107 def resolve_components( 108 self, stream_template_config: Dict[str, Any] 109 ) -> Iterable[Dict[str, Any]]: 110 """ 111 Resolves components in the stream template configuration by populating values. 112 113 Args: 114 stream_template_config (Dict[str, Any]): Stream template to populate. 115 116 Yields: 117 Dict[str, Any]: Updated configurations with resolved components. 118 """ 119 kwargs = {"stream_template_config": stream_template_config} 120 121 for components_values in self._stream_config: 122 updated_config = deepcopy(stream_template_config) 123 kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any] 124 125 for resolved_component in self._resolved_components: 126 valid_types = ( 127 (resolved_component.value_type,) if resolved_component.value_type else None 128 ) 129 value = resolved_component.value.eval( 130 self.config, valid_types=valid_types, **kwargs 131 ) 132 133 path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path] 134 135 dpath.set(updated_config, path, value) 136 137 yield updated_config
Resolves components in the stream template configuration by populating values.
Arguments:
- stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:
Dict[str, Any]: Updated configurations with resolved components.