airbyte_cdk.sources.declarative.resolvers

 1#
 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 3#
 4
 5from typing import Mapping
 6
 7from pydantic.v1 import BaseModel
 8
 9from airbyte_cdk.sources.declarative.models import (
10    ConfigComponentsResolver as ConfigComponentsResolverModel,
11)
12from airbyte_cdk.sources.declarative.models import (
13    HttpComponentsResolver as HttpComponentsResolverModel,
14)
15from airbyte_cdk.sources.declarative.models import (
16    ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
17)
18from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
19    ComponentMappingDefinition,
20    ComponentsResolver,
21    ResolvedComponentMappingDefinition,
22)
23from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import (
24    ConfigComponentsResolver,
25    StreamConfig,
26)
27from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import (
28    HttpComponentsResolver,
29)
30from airbyte_cdk.sources.declarative.resolvers.parametrized_components_resolver import (
31    ParametrizedComponentsResolver,
32    StreamParametersDefinition,
33)
34
35COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
36    "HttpComponentsResolver": HttpComponentsResolverModel,
37    "ConfigComponentsResolver": ConfigComponentsResolverModel,
38    "ParametrizedComponentsResolver": ParametrizedComponentsResolverModel,
39}
40
41__all__ = [
42    "ComponentsResolver",
43    "HttpComponentsResolver",
44    "ComponentMappingDefinition",
45    "ResolvedComponentMappingDefinition",
46    "StreamConfig",
47    "ConfigComponentsResolver",
48    "COMPONENTS_RESOLVER_TYPE_MAPPING",
49    "ParametrizedComponentsResolver",
50    "StreamParametersDefinition",
51]
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class ComponentsResolver(abc.ABC):
42@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
43@dataclass
44class ComponentsResolver(ABC):
45    """
46    Abstract base class for resolving components in a stream template.
47    """
48
49    @abstractmethod
50    def resolve_components(
51        self, stream_template_config: Dict[str, Any]
52    ) -> Iterable[Dict[str, Any]]:
53        """
54        Maps and populates values into a stream template configuration.
55        :param stream_template_config: The stream template with placeholders for components.
56        :yields: The resolved stream config with populated values.
57        """
58        pass

Abstract base class for resolving components in a stream template.

@abstractmethod
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
49    @abstractmethod
50    def resolve_components(
51        self, stream_template_config: Dict[str, Any]
52    ) -> Iterable[Dict[str, Any]]:
53        """
54        Maps and populates values into a stream template configuration.
55        :param stream_template_config: The stream template with placeholders for components.
56        :yields: The resolved stream config with populated values.
57        """
58        pass

Maps and populates values into a stream template configuration.

Parameters
  • stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class HttpComponentsResolver(airbyte_cdk.sources.declarative.resolvers.ComponentsResolver):
 24@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 25@dataclass
 26class HttpComponentsResolver(ComponentsResolver):
 27    """
 28    Resolves and populates stream templates with components fetched via an HTTP retriever.
 29
 30    Attributes:
 31        retriever (Retriever): The retriever used to fetch data from an API.
 32        config (Config): Configuration object for the resolver.
 33        components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
 34        parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
 35    """
 36
 37    retriever: Retriever
 38    config: Config
 39    components_mapping: List[ComponentMappingDefinition]
 40    parameters: InitVar[Mapping[str, Any]]
 41    _resolved_components: List[ResolvedComponentMappingDefinition] = field(
 42        init=False, repr=False, default_factory=list
 43    )
 44
 45    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 46        """
 47        Initializes and parses component mappings, converting them to resolved definitions.
 48
 49        Args:
 50            parameters (Mapping[str, Any]): Parameters for interpolation.
 51        """
 52        for component_mapping in self.components_mapping:
 53            if isinstance(component_mapping.value, (str, InterpolatedString)):
 54                interpolated_value = (
 55                    InterpolatedString.create(component_mapping.value, parameters=parameters)
 56                    if isinstance(component_mapping.value, str)
 57                    else component_mapping.value
 58                )
 59
 60                field_path = [
 61                    InterpolatedString.create(path, parameters=parameters)
 62                    for path in component_mapping.field_path
 63                ]
 64
 65                self._resolved_components.append(
 66                    ResolvedComponentMappingDefinition(
 67                        field_path=field_path,
 68                        value=interpolated_value,
 69                        value_type=component_mapping.value_type,
 70                        parameters=parameters,
 71                    )
 72                )
 73            else:
 74                raise ValueError(
 75                    f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
 76                )
 77
 78    def resolve_components(
 79        self, stream_template_config: Dict[str, Any]
 80    ) -> Iterable[Dict[str, Any]]:
 81        """
 82        Resolves components in the stream template configuration by populating values.
 83
 84        Args:
 85            stream_template_config (Dict[str, Any]): Stream template to populate.
 86
 87        Yields:
 88            Dict[str, Any]: Updated configurations with resolved components.
 89        """
 90        kwargs = {"stream_template_config": stream_template_config}
 91
 92        for stream_slice in self.retriever.stream_slices():
 93            for components_values in self.retriever.read_records(
 94                records_schema={}, stream_slice=stream_slice
 95            ):
 96                updated_config = deepcopy(stream_template_config)
 97                kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 98                kwargs["stream_slice"] = stream_slice  # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]
 99
100                for resolved_component in self._resolved_components:
101                    valid_types = (
102                        (resolved_component.value_type,) if resolved_component.value_type else None
103                    )
104                    value = resolved_component.value.eval(
105                        self.config, valid_types=valid_types, **kwargs
106                    )
107
108                    path = [
109                        path.eval(self.config, **kwargs) for path in resolved_component.field_path
110                    ]
111                    dpath.set(updated_config, path, value)
112
113                yield updated_config

Resolves and populates stream templates with components fetched via an HTTP retriever.

Attributes:
  • retriever (Retriever): The retriever used to fetch data from an API.
  • config (Config): Configuration object for the resolver.
  • components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
  • parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
HttpComponentsResolver( retriever: airbyte_cdk.sources.declarative.retrievers.Retriever, config: Mapping[str, Any], components_mapping: List[ComponentMappingDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
config: Mapping[str, Any]
components_mapping: List[ComponentMappingDefinition]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
 78    def resolve_components(
 79        self, stream_template_config: Dict[str, Any]
 80    ) -> Iterable[Dict[str, Any]]:
 81        """
 82        Resolves components in the stream template configuration by populating values.
 83
 84        Args:
 85            stream_template_config (Dict[str, Any]): Stream template to populate.
 86
 87        Yields:
 88            Dict[str, Any]: Updated configurations with resolved components.
 89        """
 90        kwargs = {"stream_template_config": stream_template_config}
 91
 92        for stream_slice in self.retriever.stream_slices():
 93            for components_values in self.retriever.read_records(
 94                records_schema={}, stream_slice=stream_slice
 95            ):
 96                updated_config = deepcopy(stream_template_config)
 97                kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 98                kwargs["stream_slice"] = stream_slice  # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]
 99
100                for resolved_component in self._resolved_components:
101                    valid_types = (
102                        (resolved_component.value_type,) if resolved_component.value_type else None
103                    )
104                    value = resolved_component.value.eval(
105                        self.config, valid_types=valid_types, **kwargs
106                    )
107
108                    path = [
109                        path.eval(self.config, **kwargs) for path in resolved_component.field_path
110                    ]
111                    dpath.set(updated_config, path, value)
112
113                yield updated_config

Resolves components in the stream template configuration by populating values.

Arguments:
  • stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:

Dict[str, Any]: Updated configurations with resolved components.

@dataclass(frozen=True)
class ComponentMappingDefinition:
16@dataclass(frozen=True)
17class ComponentMappingDefinition:
18    """Defines the configuration for mapping a component in a stream. This class specifies
19    what field in the stream template should be updated with value, supporting dynamic interpolation
20    and type enforcement."""
21
22    field_path: List["InterpolatedString"]
23    value: Union["InterpolatedString", str]
24    value_type: Optional[Type[Any]]
25    parameters: InitVar[Mapping[str, Any]]
26    create_or_update: Optional[bool] = False

Defines the configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.

ComponentMappingDefinition( field_path: List[airbyte_cdk.InterpolatedString], value: Union[airbyte_cdk.InterpolatedString, str], value_type: Optional[Type[Any]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], create_or_update: Optional[bool] = False)
field_path: List[airbyte_cdk.InterpolatedString]
value: Union[airbyte_cdk.InterpolatedString, str]
value_type: Optional[Type[Any]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
create_or_update: Optional[bool] = False
@dataclass(frozen=True)
class ResolvedComponentMappingDefinition:
29@dataclass(frozen=True)
30class ResolvedComponentMappingDefinition:
31    """Defines resolved configuration for mapping a component in a stream. This class specifies
32    what field in the stream template should be updated with value, supporting dynamic interpolation
33    and type enforcement."""
34
35    field_path: List["InterpolatedString"]
36    value: "InterpolatedString"
37    value_type: Optional[Type[Any]]
38    parameters: InitVar[Mapping[str, Any]]
39    create_or_update: Optional[bool] = False

Defines resolved configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.

ResolvedComponentMappingDefinition( field_path: List[airbyte_cdk.InterpolatedString], value: airbyte_cdk.InterpolatedString, value_type: Optional[Type[Any]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], create_or_update: Optional[bool] = False)
field_path: List[airbyte_cdk.InterpolatedString]
value_type: Optional[Type[Any]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
create_or_update: Optional[bool] = False
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class StreamConfig:
27@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
28@dataclass
29class StreamConfig:
30    """
31    Identifies stream config details for dynamic schema extraction and processing.
32    """
33
34    configs_pointer: List[Union[InterpolatedString, str]]
35    parameters: InitVar[Mapping[str, Any]]
36    default_values: Optional[List[Any]] = None
37
38    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
39        self.configs_pointer = [
40            InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer
41        ]

Identifies stream config details for dynamic schema extraction and processing.

StreamConfig( configs_pointer: List[Union[airbyte_cdk.InterpolatedString, str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], default_values: Optional[List[Any]] = None)
configs_pointer: List[Union[airbyte_cdk.InterpolatedString, str]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
default_values: Optional[List[Any]] = None
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class ConfigComponentsResolver(airbyte_cdk.sources.declarative.resolvers.ComponentsResolver):
 44@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 45@dataclass
 46class ConfigComponentsResolver(ComponentsResolver):
 47    """
 48    Resolves and populates stream templates with components fetched via source config.
 49
 50    Attributes:
 51        stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
 52        config (Config): Configuration object for the resolver.
 53        components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
 54        parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
 55    """
 56
 57    stream_configs: List[StreamConfig]
 58    config: Config
 59    components_mapping: List[ComponentMappingDefinition]
 60    parameters: InitVar[Mapping[str, Any]]
 61    _resolved_components: List[ResolvedComponentMappingDefinition] = field(
 62        init=False, repr=False, default_factory=list
 63    )
 64
 65    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 66        """
 67        Initializes and parses component mappings, converting them to resolved definitions.
 68
 69        Args:
 70            parameters (Mapping[str, Any]): Parameters for interpolation.
 71        """
 72
 73        for component_mapping in self.components_mapping:
 74            if isinstance(component_mapping.value, (str, InterpolatedString)):
 75                interpolated_value = (
 76                    InterpolatedString.create(component_mapping.value, parameters=parameters)
 77                    if isinstance(component_mapping.value, str)
 78                    else component_mapping.value
 79                )
 80
 81                field_path = [
 82                    InterpolatedString.create(path, parameters=parameters)
 83                    for path in component_mapping.field_path
 84                ]
 85
 86                self._resolved_components.append(
 87                    ResolvedComponentMappingDefinition(
 88                        field_path=field_path,
 89                        value=interpolated_value,
 90                        value_type=component_mapping.value_type,
 91                        create_or_update=component_mapping.create_or_update,
 92                        parameters=parameters,
 93                    )
 94                )
 95            else:
 96                raise ValueError(
 97                    f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
 98                )
 99
100    @staticmethod
101    def _merge_combination(combo: Iterable[Tuple[int, Any]]) -> Dict[str, Any]:
102        """Collapse a combination of ``(idx, elem)`` into one config dict."""
103        result: Dict[str, Any] = {}
104        for config_index, (elem_index, elem) in enumerate(combo):
105            if isinstance(elem, dict):
106                result.update(elem)
107            else:
108                # keep non-dict values under an artificial name
109                result.setdefault(f"source_config_{config_index}", (elem_index, elem))
110        return result
111
112    @property
113    def _stream_config(self) -> List[Dict[str, Any]]:
114        """
115        Build every unique stream-configuration combination defined by
116        each ``StreamConfig`` and any ``default_values``.
117        """
118        all_indexed_streams = []
119        for stream_config in self.stream_configs:
120            path = [
121                node.eval(self.config) if not isinstance(node, str) else node
122                for node in stream_config.configs_pointer
123            ]
124            stream_configs_raw = dpath.get(dict(self.config), path, default=[])
125            stream_configs = (
126                list(stream_configs_raw)
127                if isinstance(stream_configs_raw, list)
128                else [stream_configs_raw]
129            )
130
131            if stream_config.default_values:
132                stream_configs.extend(stream_config.default_values)
133
134            all_indexed_streams.append([(i, item) for i, item in enumerate(stream_configs)])
135        return [
136            self._merge_combination(combo)  # type: ignore[arg-type]
137            for combo in product(*all_indexed_streams)
138        ]
139
140    def resolve_components(
141        self, stream_template_config: Dict[str, Any]
142    ) -> Iterable[Dict[str, Any]]:
143        """
144        Resolves components in the stream template configuration by populating values.
145
146        Args:
147            stream_template_config (Dict[str, Any]): Stream template to populate.
148
149        Yields:
150            Dict[str, Any]: Updated configurations with resolved components.
151        """
152        kwargs = {"stream_template_config": stream_template_config}
153
154        for components_values in self._stream_config:
155            updated_config = deepcopy(stream_template_config)
156            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
157
158            for resolved_component in self._resolved_components:
159                valid_types = (
160                    (resolved_component.value_type,) if resolved_component.value_type else None
161                )
162                value = resolved_component.value.eval(
163                    self.config, valid_types=valid_types, **kwargs
164                )
165
166                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
167                parsed_value = self._parse_yaml_if_possible(value)
168                updated = dpath.set(updated_config, path, parsed_value)
169
170                if parsed_value and not updated and resolved_component.create_or_update:
171                    dpath.new(updated_config, path, parsed_value)
172
173            yield updated_config
174
175    @staticmethod
176    def _parse_yaml_if_possible(value: Any) -> Any:
177        """
178        Try to turn value into a Python object by YAML-parsing it.
179
180        * If value is a `str` and can be parsed by `yaml.safe_load`,
181          return the parsed result.
182        * If parsing fails (`yaml.parser.ParserError`) – or value is not
183          a string at all – return the original value unchanged.
184        """
185        if isinstance(value, str):
186            try:
187                return yaml.safe_load(value)
188            except ParserError:  # "{{ record[0] in ['cohortActiveUsers'] }}"   # not valid YAML
189                return value
190            except ScannerError as e:  # "%Y-%m-%d'   # not valid yaml
191                if "expected alphabetic or numeric character, but found '%'" in str(e):
192                    return value
193                raise e
194        return value

Resolves and populates stream templates with components fetched via source config.

Attributes:
  • stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
  • config (Config): Configuration object for the resolver.
  • components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
  • parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
ConfigComponentsResolver( stream_configs: List[StreamConfig], config: Mapping[str, Any], components_mapping: List[ComponentMappingDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
stream_configs: List[StreamConfig]
config: Mapping[str, Any]
components_mapping: List[ComponentMappingDefinition]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
140    def resolve_components(
141        self, stream_template_config: Dict[str, Any]
142    ) -> Iterable[Dict[str, Any]]:
143        """
144        Resolves components in the stream template configuration by populating values.
145
146        Args:
147            stream_template_config (Dict[str, Any]): Stream template to populate.
148
149        Yields:
150            Dict[str, Any]: Updated configurations with resolved components.
151        """
152        kwargs = {"stream_template_config": stream_template_config}
153
154        for components_values in self._stream_config:
155            updated_config = deepcopy(stream_template_config)
156            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
157
158            for resolved_component in self._resolved_components:
159                valid_types = (
160                    (resolved_component.value_type,) if resolved_component.value_type else None
161                )
162                value = resolved_component.value.eval(
163                    self.config, valid_types=valid_types, **kwargs
164                )
165
166                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
167                parsed_value = self._parse_yaml_if_possible(value)
168                updated = dpath.set(updated_config, path, parsed_value)
169
170                if parsed_value and not updated and resolved_component.create_or_update:
171                    dpath.new(updated_config, path, parsed_value)
172
173            yield updated_config

Resolves components in the stream template configuration by populating values.

Arguments:
  • stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:

Dict[str, Any]: Updated configurations with resolved components.

COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[pydantic.v1.main.BaseModel]] = {'HttpComponentsResolver': <class 'airbyte_cdk.sources.declarative.models.declarative_component_schema.HttpComponentsResolver'>, 'ConfigComponentsResolver': <class 'airbyte_cdk.sources.declarative.models.declarative_component_schema.ConfigComponentsResolver'>, 'ParametrizedComponentsResolver': <class 'airbyte_cdk.sources.declarative.models.declarative_component_schema.ParametrizedComponentsResolver'>}
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class ParametrizedComponentsResolver(airbyte_cdk.sources.declarative.resolvers.ComponentsResolver):
 35@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 36@dataclass
 37class ParametrizedComponentsResolver(ComponentsResolver):
 38    """
 39    Resolves and populates dynamic streams from defined parametrized values in manifest.
 40    """
 41
 42    stream_parameters: StreamParametersDefinition
 43    config: Config
 44    components_mapping: List[ComponentMappingDefinition]
 45    parameters: InitVar[Mapping[str, Any]]
 46    _resolved_components: List[ResolvedComponentMappingDefinition] = field(
 47        init=False, repr=False, default_factory=list
 48    )
 49
 50    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 51        """
 52        Initializes and parses component mappings, converting them to resolved definitions.
 53
 54        Args:
 55            parameters (Mapping[str, Any]): Parameters for interpolation.
 56        """
 57
 58        for component_mapping in self.components_mapping:
 59            if isinstance(component_mapping.value, (str, InterpolatedString)):
 60                interpolated_value = (
 61                    InterpolatedString.create(component_mapping.value, parameters=parameters)
 62                    if isinstance(component_mapping.value, str)
 63                    else component_mapping.value
 64                )
 65
 66                field_path = [
 67                    InterpolatedString.create(path, parameters=parameters)
 68                    for path in component_mapping.field_path
 69                ]
 70
 71                self._resolved_components.append(
 72                    ResolvedComponentMappingDefinition(
 73                        field_path=field_path,
 74                        value=interpolated_value,
 75                        value_type=component_mapping.value_type,
 76                        create_or_update=component_mapping.create_or_update,
 77                        parameters=parameters,
 78                    )
 79                )
 80            else:
 81                raise ValueError(
 82                    f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
 83                )
 84
 85    def resolve_components(
 86        self, stream_template_config: Dict[str, Any]
 87    ) -> Iterable[Dict[str, Any]]:
 88        kwargs = {"stream_template_config": stream_template_config}
 89
 90        for components_values in self.stream_parameters.list_of_parameters_for_stream:
 91            updated_config = deepcopy(stream_template_config)
 92            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 93            for resolved_component in self._resolved_components:
 94                valid_types = (
 95                    (resolved_component.value_type,) if resolved_component.value_type else None
 96                )
 97                value = resolved_component.value.eval(
 98                    self.config, valid_types=valid_types, **kwargs
 99                )
100                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
101                parsed_value = self._parse_yaml_if_possible(value)
102                # https://github.com/dpath-maintainers/dpath-python/blob/master/dpath/__init__.py#L136
103                # dpath.set returns the number of changed elements, 0 when no elements changed
104                updated = dpath.set(updated_config, path, parsed_value)
105
106                if parsed_value and not updated and resolved_component.create_or_update:
107                    dpath.new(updated_config, path, parsed_value)
108
109            yield updated_config
110
111    @staticmethod
112    def _parse_yaml_if_possible(value: Any) -> Any:
113        """
114        Try to turn value into a Python object by YAML-parsing it.
115
116        * If value is a `str` and can be parsed by `yaml.safe_load`,
117          return the parsed result.
118        * If parsing fails (`yaml.parser.ParserError`) – or value is not
119          a string at all – return the original value unchanged.
120        """
121        if isinstance(value, str):
122            try:
123                return yaml.safe_load(value)
124            except ParserError:  # "{{ record[0] in ['cohortActiveUsers'] }}"   # not valid YAML
125                return value
126        return value

Resolves and populates dynamic streams from defined parametrized values in manifest.

ParametrizedComponentsResolver( stream_parameters: StreamParametersDefinition, config: Mapping[str, Any], components_mapping: List[ComponentMappingDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
stream_parameters: StreamParametersDefinition
config: Mapping[str, Any]
components_mapping: List[ComponentMappingDefinition]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
 85    def resolve_components(
 86        self, stream_template_config: Dict[str, Any]
 87    ) -> Iterable[Dict[str, Any]]:
 88        kwargs = {"stream_template_config": stream_template_config}
 89
 90        for components_values in self.stream_parameters.list_of_parameters_for_stream:
 91            updated_config = deepcopy(stream_template_config)
 92            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 93            for resolved_component in self._resolved_components:
 94                valid_types = (
 95                    (resolved_component.value_type,) if resolved_component.value_type else None
 96                )
 97                value = resolved_component.value.eval(
 98                    self.config, valid_types=valid_types, **kwargs
 99                )
100                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
101                parsed_value = self._parse_yaml_if_possible(value)
102                # https://github.com/dpath-maintainers/dpath-python/blob/master/dpath/__init__.py#L136
103                # dpath.set returns the number of changed elements, 0 when no elements changed
104                updated = dpath.set(updated_config, path, parsed_value)
105
106                if parsed_value and not updated and resolved_component.create_or_update:
107                    dpath.new(updated_config, path, parsed_value)
108
109            yield updated_config

Maps and populates values into a stream template configuration.

Parameters
  • stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class StreamParametersDefinition:
25@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
26@dataclass
27class StreamParametersDefinition:
28    """
29    Represents a stream parameters definition to set up dynamic streams from defined values in manifest.
30    """
31
32    list_of_parameters_for_stream: List[Dict[str, Any]]

Represents a stream parameters definition to set up dynamic streams from defined values in manifest.

StreamParametersDefinition(list_of_parameters_for_stream: List[Dict[str, Any]])
list_of_parameters_for_stream: List[Dict[str, Any]]