airbyte_cdk.sources.declarative.resolvers

 1#
 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 3#
 4
 5from typing import Mapping
 6
 7from pydantic.v1 import BaseModel
 8
 9from airbyte_cdk.sources.declarative.models import (
10    ConfigComponentsResolver as ConfigComponentsResolverModel,
11)
12from airbyte_cdk.sources.declarative.models import (
13    HttpComponentsResolver as HttpComponentsResolverModel,
14)
15from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
16    ComponentMappingDefinition,
17    ComponentsResolver,
18    ResolvedComponentMappingDefinition,
19)
20from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import (
21    ConfigComponentsResolver,
22    StreamConfig,
23)
24from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import (
25    HttpComponentsResolver,
26)
27
28COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
29    "HttpComponentsResolver": HttpComponentsResolverModel,
30    "ConfigComponentsResolver": ConfigComponentsResolverModel,
31}
32
33__all__ = [
34    "ComponentsResolver",
35    "HttpComponentsResolver",
36    "ComponentMappingDefinition",
37    "ResolvedComponentMappingDefinition",
38    "StreamConfig",
39    "ConfigComponentsResolver",
40    "COMPONENTS_RESOLVER_TYPE_MAPPING",
41]
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class ComponentsResolver(abc.ABC):
42@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
43@dataclass
44class ComponentsResolver(ABC):
45    """
46    Abstract base class for resolving components in a stream template.
47    """
48
49    @abstractmethod
50    def resolve_components(
51        self, stream_template_config: Dict[str, Any]
52    ) -> Iterable[Dict[str, Any]]:
53        """
54        Maps and populates values into a stream template configuration.
55        :param stream_template_config: The stream template with placeholders for components.
56        :yields: The resolved stream config with populated values.
57        """
58        pass

Abstract base class for resolving components in a stream template.

@abstractmethod
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
49    @abstractmethod
50    def resolve_components(
51        self, stream_template_config: Dict[str, Any]
52    ) -> Iterable[Dict[str, Any]]:
53        """
54        Maps and populates values into a stream template configuration.
55        :param stream_template_config: The stream template with placeholders for components.
56        :yields: The resolved stream config with populated values.
57        """
58        pass

Maps and populates values into a stream template configuration.

Parameters
  • stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class HttpComponentsResolver(airbyte_cdk.sources.declarative.resolvers.ComponentsResolver):
 24@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 25@dataclass
 26class HttpComponentsResolver(ComponentsResolver):
 27    """
 28    Resolves and populates stream templates with components fetched via an HTTP retriever.
 29
 30    Attributes:
 31        retriever (Retriever): The retriever used to fetch data from an API.
 32        config (Config): Configuration object for the resolver.
 33        components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
 34        parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
 35    """
 36
 37    retriever: Retriever
 38    config: Config
 39    components_mapping: List[ComponentMappingDefinition]
 40    parameters: InitVar[Mapping[str, Any]]
 41    _resolved_components: List[ResolvedComponentMappingDefinition] = field(
 42        init=False, repr=False, default_factory=list
 43    )
 44
 45    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 46        """
 47        Initializes and parses component mappings, converting them to resolved definitions.
 48
 49        Args:
 50            parameters (Mapping[str, Any]): Parameters for interpolation.
 51        """
 52        for component_mapping in self.components_mapping:
 53            if isinstance(component_mapping.value, (str, InterpolatedString)):
 54                interpolated_value = (
 55                    InterpolatedString.create(component_mapping.value, parameters=parameters)
 56                    if isinstance(component_mapping.value, str)
 57                    else component_mapping.value
 58                )
 59
 60                field_path = [
 61                    InterpolatedString.create(path, parameters=parameters)
 62                    for path in component_mapping.field_path
 63                ]
 64
 65                self._resolved_components.append(
 66                    ResolvedComponentMappingDefinition(
 67                        field_path=field_path,
 68                        value=interpolated_value,
 69                        value_type=component_mapping.value_type,
 70                        parameters=parameters,
 71                    )
 72                )
 73            else:
 74                raise ValueError(
 75                    f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
 76                )
 77
 78    def resolve_components(
 79        self, stream_template_config: Dict[str, Any]
 80    ) -> Iterable[Dict[str, Any]]:
 81        """
 82        Resolves components in the stream template configuration by populating values.
 83
 84        Args:
 85            stream_template_config (Dict[str, Any]): Stream template to populate.
 86
 87        Yields:
 88            Dict[str, Any]: Updated configurations with resolved components.
 89        """
 90        kwargs = {"stream_template_config": stream_template_config}
 91
 92        for stream_slice in self.retriever.stream_slices():
 93            for components_values in self.retriever.read_records(
 94                records_schema={}, stream_slice=stream_slice
 95            ):
 96                updated_config = deepcopy(stream_template_config)
 97                kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 98                kwargs["stream_slice"] = stream_slice  # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]
 99
100                for resolved_component in self._resolved_components:
101                    valid_types = (
102                        (resolved_component.value_type,) if resolved_component.value_type else None
103                    )
104                    value = resolved_component.value.eval(
105                        self.config, valid_types=valid_types, **kwargs
106                    )
107
108                    path = [
109                        path.eval(self.config, **kwargs) for path in resolved_component.field_path
110                    ]
111                    dpath.set(updated_config, path, value)
112
113                yield updated_config

Resolves and populates stream templates with components fetched via an HTTP retriever.

Attributes:
  • retriever (Retriever): The retriever used to fetch data from an API.
  • config (Config): Configuration object for the resolver.
  • components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
  • parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
HttpComponentsResolver( retriever: airbyte_cdk.sources.declarative.retrievers.Retriever, config: Mapping[str, Any], components_mapping: List[ComponentMappingDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
config: Mapping[str, Any]
components_mapping: List[ComponentMappingDefinition]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
 78    def resolve_components(
 79        self, stream_template_config: Dict[str, Any]
 80    ) -> Iterable[Dict[str, Any]]:
 81        """
 82        Resolves components in the stream template configuration by populating values.
 83
 84        Args:
 85            stream_template_config (Dict[str, Any]): Stream template to populate.
 86
 87        Yields:
 88            Dict[str, Any]: Updated configurations with resolved components.
 89        """
 90        kwargs = {"stream_template_config": stream_template_config}
 91
 92        for stream_slice in self.retriever.stream_slices():
 93            for components_values in self.retriever.read_records(
 94                records_schema={}, stream_slice=stream_slice
 95            ):
 96                updated_config = deepcopy(stream_template_config)
 97                kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 98                kwargs["stream_slice"] = stream_slice  # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]
 99
100                for resolved_component in self._resolved_components:
101                    valid_types = (
102                        (resolved_component.value_type,) if resolved_component.value_type else None
103                    )
104                    value = resolved_component.value.eval(
105                        self.config, valid_types=valid_types, **kwargs
106                    )
107
108                    path = [
109                        path.eval(self.config, **kwargs) for path in resolved_component.field_path
110                    ]
111                    dpath.set(updated_config, path, value)
112
113                yield updated_config

Resolves components in the stream template configuration by populating values.

Arguments:
  • stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:

Dict[str, Any]: Updated configurations with resolved components.

@dataclass(frozen=True)
class ComponentMappingDefinition:
16@dataclass(frozen=True)
17class ComponentMappingDefinition:
18    """Defines the configuration for mapping a component in a stream. This class specifies
19    what field in the stream template should be updated with value, supporting dynamic interpolation
20    and type enforcement."""
21
22    field_path: List["InterpolatedString"]
23    value: Union["InterpolatedString", str]
24    value_type: Optional[Type[Any]]
25    parameters: InitVar[Mapping[str, Any]]
26    create_or_update: Optional[bool] = False

Defines the configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.

ComponentMappingDefinition( field_path: List[airbyte_cdk.InterpolatedString], value: Union[airbyte_cdk.InterpolatedString, str], value_type: Optional[Type[Any]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], create_or_update: Optional[bool] = False)
field_path: List[airbyte_cdk.InterpolatedString]
value: Union[airbyte_cdk.InterpolatedString, str]
value_type: Optional[Type[Any]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
create_or_update: Optional[bool] = False
@dataclass(frozen=True)
class ResolvedComponentMappingDefinition:
29@dataclass(frozen=True)
30class ResolvedComponentMappingDefinition:
31    """Defines resolved configuration for mapping a component in a stream. This class specifies
32    what field in the stream template should be updated with value, supporting dynamic interpolation
33    and type enforcement."""
34
35    field_path: List["InterpolatedString"]
36    value: "InterpolatedString"
37    value_type: Optional[Type[Any]]
38    parameters: InitVar[Mapping[str, Any]]
39    create_or_update: Optional[bool] = False

Defines resolved configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.

ResolvedComponentMappingDefinition( field_path: List[airbyte_cdk.InterpolatedString], value: airbyte_cdk.InterpolatedString, value_type: Optional[Type[Any]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], create_or_update: Optional[bool] = False)
field_path: List[airbyte_cdk.InterpolatedString]
value_type: Optional[Type[Any]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
create_or_update: Optional[bool] = False
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class StreamConfig:
26@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
27@dataclass
28class StreamConfig:
29    """
30    Identifies stream config details for dynamic schema extraction and processing.
31    """
32
33    configs_pointer: List[Union[InterpolatedString, str]]
34    parameters: InitVar[Mapping[str, Any]]
35    default_values: Optional[List[Any]] = None
36
37    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
38        self.configs_pointer = [
39            InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer
40        ]

Identifies stream config details for dynamic schema extraction and processing.

StreamConfig( configs_pointer: List[Union[airbyte_cdk.InterpolatedString, str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], default_values: Optional[List[Any]] = None)
configs_pointer: List[Union[airbyte_cdk.InterpolatedString, str]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
default_values: Optional[List[Any]] = None
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class ConfigComponentsResolver(airbyte_cdk.sources.declarative.resolvers.ComponentsResolver):
 43@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 44@dataclass
 45class ConfigComponentsResolver(ComponentsResolver):
 46    """
 47    Resolves and populates stream templates with components fetched via source config.
 48
 49    Attributes:
 50        stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
 51        config (Config): Configuration object for the resolver.
 52        components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
 53        parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
 54    """
 55
 56    stream_configs: List[StreamConfig]
 57    config: Config
 58    components_mapping: List[ComponentMappingDefinition]
 59    parameters: InitVar[Mapping[str, Any]]
 60    _resolved_components: List[ResolvedComponentMappingDefinition] = field(
 61        init=False, repr=False, default_factory=list
 62    )
 63
 64    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 65        """
 66        Initializes and parses component mappings, converting them to resolved definitions.
 67
 68        Args:
 69            parameters (Mapping[str, Any]): Parameters for interpolation.
 70        """
 71
 72        for component_mapping in self.components_mapping:
 73            if isinstance(component_mapping.value, (str, InterpolatedString)):
 74                interpolated_value = (
 75                    InterpolatedString.create(component_mapping.value, parameters=parameters)
 76                    if isinstance(component_mapping.value, str)
 77                    else component_mapping.value
 78                )
 79
 80                field_path = [
 81                    InterpolatedString.create(path, parameters=parameters)
 82                    for path in component_mapping.field_path
 83                ]
 84
 85                self._resolved_components.append(
 86                    ResolvedComponentMappingDefinition(
 87                        field_path=field_path,
 88                        value=interpolated_value,
 89                        value_type=component_mapping.value_type,
 90                        create_or_update=component_mapping.create_or_update,
 91                        parameters=parameters,
 92                    )
 93                )
 94            else:
 95                raise ValueError(
 96                    f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
 97                )
 98
 99    @staticmethod
100    def _merge_combination(combo: Iterable[Tuple[int, Any]]) -> Dict[str, Any]:
101        """Collapse a combination of ``(idx, elem)`` into one config dict."""
102        result: Dict[str, Any] = {}
103        for config_index, (elem_index, elem) in enumerate(combo):
104            if isinstance(elem, dict):
105                result.update(elem)
106            else:
107                # keep non-dict values under an artificial name
108                result.setdefault(f"source_config_{config_index}", (elem_index, elem))
109        return result
110
111    @property
112    def _stream_config(self) -> List[Dict[str, Any]]:
113        """
114        Build every unique stream-configuration combination defined by
115        each ``StreamConfig`` and any ``default_values``.
116        """
117        all_indexed_streams = []
118        for stream_config in self.stream_configs:
119            path = [
120                node.eval(self.config) if not isinstance(node, str) else node
121                for node in stream_config.configs_pointer
122            ]
123            stream_configs_raw = dpath.get(dict(self.config), path, default=[])
124            stream_configs = (
125                list(stream_configs_raw)
126                if isinstance(stream_configs_raw, list)
127                else [stream_configs_raw]
128            )
129
130            if stream_config.default_values:
131                stream_configs.extend(stream_config.default_values)
132
133            all_indexed_streams.append([(i, item) for i, item in enumerate(stream_configs)])
134        return [
135            self._merge_combination(combo)  # type: ignore[arg-type]
136            for combo in product(*all_indexed_streams)
137        ]
138
139    def resolve_components(
140        self, stream_template_config: Dict[str, Any]
141    ) -> Iterable[Dict[str, Any]]:
142        """
143        Resolves components in the stream template configuration by populating values.
144
145        Args:
146            stream_template_config (Dict[str, Any]): Stream template to populate.
147
148        Yields:
149            Dict[str, Any]: Updated configurations with resolved components.
150        """
151        kwargs = {"stream_template_config": stream_template_config}
152
153        for components_values in self._stream_config:
154            updated_config = deepcopy(stream_template_config)
155            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
156
157            for resolved_component in self._resolved_components:
158                valid_types = (
159                    (resolved_component.value_type,) if resolved_component.value_type else None
160                )
161                value = resolved_component.value.eval(
162                    self.config, valid_types=valid_types, **kwargs
163                )
164
165                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
166                parsed_value = self._parse_yaml_if_possible(value)
167                updated = dpath.set(updated_config, path, parsed_value)
168
169                if parsed_value and not updated and resolved_component.create_or_update:
170                    dpath.new(updated_config, path, parsed_value)
171
172            yield updated_config
173
174    @staticmethod
175    def _parse_yaml_if_possible(value: Any) -> Any:
176        """
177        Try to turn value into a Python object by YAML-parsing it.
178
179        * If value is a `str` and can be parsed by `yaml.safe_load`,
180          return the parsed result.
181        * If parsing fails (`yaml.parser.ParserError`) – or value is not
182          a string at all – return the original value unchanged.
183        """
184        if isinstance(value, str):
185            try:
186                return yaml.safe_load(value)
187            except ParserError:  # "{{ record[0] in ['cohortActiveUsers'] }}"   # not valid YAML
188                return value
189        return value

Resolves and populates stream templates with components fetched via source config.

Attributes:
  • stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
  • config (Config): Configuration object for the resolver.
  • components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
  • parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
ConfigComponentsResolver( stream_configs: List[StreamConfig], config: Mapping[str, Any], components_mapping: List[ComponentMappingDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
stream_configs: List[StreamConfig]
config: Mapping[str, Any]
components_mapping: List[ComponentMappingDefinition]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
139    def resolve_components(
140        self, stream_template_config: Dict[str, Any]
141    ) -> Iterable[Dict[str, Any]]:
142        """
143        Resolves components in the stream template configuration by populating values.
144
145        Args:
146            stream_template_config (Dict[str, Any]): Stream template to populate.
147
148        Yields:
149            Dict[str, Any]: Updated configurations with resolved components.
150        """
151        kwargs = {"stream_template_config": stream_template_config}
152
153        for components_values in self._stream_config:
154            updated_config = deepcopy(stream_template_config)
155            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
156
157            for resolved_component in self._resolved_components:
158                valid_types = (
159                    (resolved_component.value_type,) if resolved_component.value_type else None
160                )
161                value = resolved_component.value.eval(
162                    self.config, valid_types=valid_types, **kwargs
163                )
164
165                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
166                parsed_value = self._parse_yaml_if_possible(value)
167                updated = dpath.set(updated_config, path, parsed_value)
168
169                if parsed_value and not updated and resolved_component.create_or_update:
170                    dpath.new(updated_config, path, parsed_value)
171
172            yield updated_config

Resolves components in the stream template configuration by populating values.

Arguments:
  • stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:

Dict[str, Any]: Updated configurations with resolved components.

COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[pydantic.v1.main.BaseModel]] = {'HttpComponentsResolver': <class 'airbyte_cdk.sources.declarative.models.declarative_component_schema.HttpComponentsResolver'>, 'ConfigComponentsResolver': <class 'airbyte_cdk.sources.declarative.models.declarative_component_schema.ConfigComponentsResolver'>}