airbyte_cdk.sources.declarative.resolvers

 1#
 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 3#
 4
 5from typing import Mapping
 6
 7from pydantic.v1 import BaseModel
 8
 9from airbyte_cdk.sources.declarative.models import (
10    ConfigComponentsResolver as ConfigComponentsResolverModel,
11)
12from airbyte_cdk.sources.declarative.models import (
13    HttpComponentsResolver as HttpComponentsResolverModel,
14)
15from airbyte_cdk.sources.declarative.models import (
16    ParametrizedComponentsResolver as ParametrizedComponentsResolverModel,
17)
18from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
19    ComponentMappingDefinition,
20    ComponentsResolver,
21    ResolvedComponentMappingDefinition,
22)
23from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import (
24    ConfigComponentsResolver,
25    StreamConfig,
26)
27from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import (
28    HttpComponentsResolver,
29)
30from airbyte_cdk.sources.declarative.resolvers.parametrized_components_resolver import (
31    ParametrizedComponentsResolver,
32    StreamParametersDefinition,
33)
34
35COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
36    "HttpComponentsResolver": HttpComponentsResolverModel,
37    "ConfigComponentsResolver": ConfigComponentsResolverModel,
38    "ParametrizedComponentsResolver": ParametrizedComponentsResolverModel,
39}
40
41__all__ = [
42    "ComponentsResolver",
43    "HttpComponentsResolver",
44    "ComponentMappingDefinition",
45    "ResolvedComponentMappingDefinition",
46    "StreamConfig",
47    "ConfigComponentsResolver",
48    "COMPONENTS_RESOLVER_TYPE_MAPPING",
49    "ParametrizedComponentsResolver",
50    "StreamParametersDefinition",
51]
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class ComponentsResolver(abc.ABC):
45@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
46@dataclass
47class ComponentsResolver(ABC):
48    """
49    Abstract base class for resolving components in a stream template.
50    """
51
52    @abstractmethod
53    def resolve_components(
54        self, stream_template_config: Dict[str, Any]
55    ) -> Iterable[Dict[str, Any]]:
56        """
57        Maps and populates values into a stream template configuration.
58        :param stream_template_config: The stream template with placeholders for components.
59        :yields: The resolved stream config with populated values.
60        """
61        pass

Abstract base class for resolving components in a stream template.

@abstractmethod
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
52    @abstractmethod
53    def resolve_components(
54        self, stream_template_config: Dict[str, Any]
55    ) -> Iterable[Dict[str, Any]]:
56        """
57        Maps and populates values into a stream template configuration.
58        :param stream_template_config: The stream template with placeholders for components.
59        :yields: The resolved stream config with populated values.
60        """
61        pass

Maps and populates values into a stream template configuration.

Parameters
  • stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class HttpComponentsResolver(airbyte_cdk.sources.declarative.resolvers.ComponentsResolver):
 24@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 25@dataclass
 26class HttpComponentsResolver(ComponentsResolver):
 27    """
 28    Resolves and populates stream templates with components fetched via an HTTP retriever.
 29
 30    Attributes:
 31        retriever (Retriever): The retriever used to fetch data from an API.
 32        config (Config): Configuration object for the resolver.
 33        components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
 34        parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
 35    """
 36
 37    retriever: Retriever
 38    config: Config
 39    components_mapping: List[ComponentMappingDefinition]
 40    parameters: InitVar[Mapping[str, Any]]
 41    _resolved_components: List[ResolvedComponentMappingDefinition] = field(
 42        init=False, repr=False, default_factory=list
 43    )
 44
 45    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 46        """
 47        Initializes and parses component mappings, converting them to resolved definitions.
 48
 49        Args:
 50            parameters (Mapping[str, Any]): Parameters for interpolation.
 51        """
 52        for component_mapping in self.components_mapping:
 53            if isinstance(component_mapping.value, (str, InterpolatedString)):
 54                interpolated_value = (
 55                    InterpolatedString.create(component_mapping.value, parameters=parameters)
 56                    if isinstance(component_mapping.value, str)
 57                    else component_mapping.value
 58                )
 59
 60                field_path = [
 61                    InterpolatedString.create(path, parameters=parameters)
 62                    for path in component_mapping.field_path
 63                ]
 64
 65                self._resolved_components.append(
 66                    ResolvedComponentMappingDefinition(
 67                        field_path=field_path,
 68                        value=interpolated_value,
 69                        value_type=component_mapping.value_type,
 70                        parameters=parameters,
 71                    )
 72                )
 73            else:
 74                raise ValueError(
 75                    f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
 76                )
 77
 78    def resolve_components(
 79        self, stream_template_config: Dict[str, Any]
 80    ) -> Iterable[Dict[str, Any]]:
 81        """
 82        Resolves components in the stream template configuration by populating values.
 83
 84        Args:
 85            stream_template_config (Dict[str, Any]): Stream template to populate.
 86
 87        Yields:
 88            Dict[str, Any]: Updated configurations with resolved components.
 89        """
 90        kwargs = {"stream_template_config": stream_template_config}
 91
 92        for stream_slice in self.retriever.stream_slices():
 93            for components_values in self.retriever.read_records(
 94                records_schema={}, stream_slice=stream_slice
 95            ):
 96                updated_config = deepcopy(stream_template_config)
 97                kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 98                kwargs["stream_slice"] = stream_slice  # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]
 99
100                for resolved_component in self._resolved_components:
101                    valid_types = (
102                        (resolved_component.value_type,) if resolved_component.value_type else None
103                    )
104                    value = resolved_component.value.eval(
105                        self.config, valid_types=valid_types, **kwargs
106                    )
107
108                    path = [
109                        path.eval(self.config, **kwargs) for path in resolved_component.field_path
110                    ]
111                    dpath.set(updated_config, path, value)
112
113                yield updated_config

Resolves and populates stream templates with components fetched via an HTTP retriever.

Attributes:
  • retriever (Retriever): The retriever used to fetch data from an API.
  • config (Config): Configuration object for the resolver.
  • components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
  • parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
HttpComponentsResolver( retriever: airbyte_cdk.sources.declarative.retrievers.Retriever, config: Mapping[str, Any], components_mapping: List[ComponentMappingDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
config: Mapping[str, Any]
components_mapping: List[ComponentMappingDefinition]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
 78    def resolve_components(
 79        self, stream_template_config: Dict[str, Any]
 80    ) -> Iterable[Dict[str, Any]]:
 81        """
 82        Resolves components in the stream template configuration by populating values.
 83
 84        Args:
 85            stream_template_config (Dict[str, Any]): Stream template to populate.
 86
 87        Yields:
 88            Dict[str, Any]: Updated configurations with resolved components.
 89        """
 90        kwargs = {"stream_template_config": stream_template_config}
 91
 92        for stream_slice in self.retriever.stream_slices():
 93            for components_values in self.retriever.read_records(
 94                records_schema={}, stream_slice=stream_slice
 95            ):
 96                updated_config = deepcopy(stream_template_config)
 97                kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 98                kwargs["stream_slice"] = stream_slice  # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]
 99
100                for resolved_component in self._resolved_components:
101                    valid_types = (
102                        (resolved_component.value_type,) if resolved_component.value_type else None
103                    )
104                    value = resolved_component.value.eval(
105                        self.config, valid_types=valid_types, **kwargs
106                    )
107
108                    path = [
109                        path.eval(self.config, **kwargs) for path in resolved_component.field_path
110                    ]
111                    dpath.set(updated_config, path, value)
112
113                yield updated_config

Resolves components in the stream template configuration by populating values.

Arguments:
  • stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:

Dict[str, Any]: Updated configurations with resolved components.

@dataclass(frozen=True)
class ComponentMappingDefinition:
17@dataclass(frozen=True)
18class ComponentMappingDefinition:
19    """Defines the configuration for mapping a component in a stream. This class specifies
20    what field in the stream template should be updated with value, supporting dynamic interpolation
21    and type enforcement."""
22
23    field_path: List["InterpolatedString"]
24    value: Union["InterpolatedString", str]
25    value_type: Optional[Type[Any]]
26    parameters: InitVar[Mapping[str, Any]]
27    condition: Optional[str] = None
28    create_or_update: Optional[bool] = False

Defines the configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.

ComponentMappingDefinition( field_path: List[airbyte_cdk.InterpolatedString], value: Union[airbyte_cdk.InterpolatedString, str], value_type: Optional[Type[Any]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], condition: Optional[str] = None, create_or_update: Optional[bool] = False)
field_path: List[airbyte_cdk.InterpolatedString]
value: Union[airbyte_cdk.InterpolatedString, str]
value_type: Optional[Type[Any]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
condition: Optional[str] = None
create_or_update: Optional[bool] = False
@dataclass(frozen=True)
class ResolvedComponentMappingDefinition:
31@dataclass(frozen=True)
32class ResolvedComponentMappingDefinition:
33    """Defines resolved configuration for mapping a component in a stream. This class specifies
34    what field in the stream template should be updated with value, supporting dynamic interpolation
35    and type enforcement."""
36
37    field_path: List["InterpolatedString"]
38    value: "InterpolatedString"
39    value_type: Optional[Type[Any]]
40    parameters: InitVar[Mapping[str, Any]]
41    condition: Optional[InterpolatedBoolean] = None
42    create_or_update: Optional[bool] = False

Defines resolved configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.

ResolvedComponentMappingDefinition( field_path: List[airbyte_cdk.InterpolatedString], value: airbyte_cdk.InterpolatedString, value_type: Optional[Type[Any]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], condition: Optional[airbyte_cdk.InterpolatedBoolean] = None, create_or_update: Optional[bool] = False)
field_path: List[airbyte_cdk.InterpolatedString]
value_type: Optional[Type[Any]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
condition: Optional[airbyte_cdk.InterpolatedBoolean] = None
create_or_update: Optional[bool] = False
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class StreamConfig:
28@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
29@dataclass
30class StreamConfig:
31    """
32    Identifies stream config details for dynamic schema extraction and processing.
33    """
34
35    configs_pointer: List[Union[InterpolatedString, str]]
36    parameters: InitVar[Mapping[str, Any]]
37    default_values: Optional[List[Any]] = None
38
39    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
40        self.configs_pointer = [
41            InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer
42        ]

Identifies stream config details for dynamic schema extraction and processing.

StreamConfig( configs_pointer: List[Union[airbyte_cdk.InterpolatedString, str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], default_values: Optional[List[Any]] = None)
configs_pointer: List[Union[airbyte_cdk.InterpolatedString, str]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
default_values: Optional[List[Any]] = None
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class ConfigComponentsResolver(airbyte_cdk.sources.declarative.resolvers.ComponentsResolver):
 45@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 46@dataclass
 47class ConfigComponentsResolver(ComponentsResolver):
 48    """
 49    Resolves and populates stream templates with components fetched via source config.
 50
 51    Attributes:
 52        stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
 53        config (Config): Configuration object for the resolver.
 54        components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
 55        parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
 56    """
 57
 58    stream_configs: List[StreamConfig]
 59    config: Config
 60    components_mapping: List[ComponentMappingDefinition]
 61    parameters: InitVar[Mapping[str, Any]]
 62    _resolved_components: List[ResolvedComponentMappingDefinition] = field(
 63        init=False, repr=False, default_factory=list
 64    )
 65
 66    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 67        """
 68        Initializes and parses component mappings, converting them to resolved definitions.
 69
 70        Args:
 71            parameters (Mapping[str, Any]): Parameters for interpolation.
 72        """
 73
 74        for component_mapping in self.components_mapping:
 75            interpolated_condition = (
 76                InterpolatedBoolean(condition=component_mapping.condition, parameters=parameters)
 77                if component_mapping.condition
 78                else None
 79            )
 80
 81            if isinstance(component_mapping.value, (str, InterpolatedString)):
 82                interpolated_value = (
 83                    InterpolatedString.create(component_mapping.value, parameters=parameters)
 84                    if isinstance(component_mapping.value, str)
 85                    else component_mapping.value
 86                )
 87
 88                field_path = [
 89                    InterpolatedString.create(path, parameters=parameters)
 90                    for path in component_mapping.field_path
 91                ]
 92
 93                self._resolved_components.append(
 94                    ResolvedComponentMappingDefinition(
 95                        field_path=field_path,
 96                        value=interpolated_value,
 97                        value_type=component_mapping.value_type,
 98                        create_or_update=component_mapping.create_or_update,
 99                        parameters=parameters,
100                        condition=interpolated_condition,
101                    )
102                )
103            else:
104                raise ValueError(
105                    f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
106                )
107
108    @staticmethod
109    def _merge_combination(combo: Iterable[Tuple[int, Any]]) -> Dict[str, Any]:
110        """Collapse a combination of ``(idx, elem)`` into one config dict."""
111        result: Dict[str, Any] = {}
112        for config_index, (elem_index, elem) in enumerate(combo):
113            if isinstance(elem, dict):
114                result.update(elem)
115            else:
116                # keep non-dict values under an artificial name
117                result.setdefault(f"source_config_{config_index}", (elem_index, elem))
118        return result
119
120    @property
121    def _stream_config(self) -> List[Dict[str, Any]]:
122        """
123        Build every unique stream-configuration combination defined by
124        each ``StreamConfig`` and any ``default_values``.
125        """
126        all_indexed_streams = []
127        for stream_config in self.stream_configs:
128            path = [
129                node.eval(self.config) if not isinstance(node, str) else node
130                for node in stream_config.configs_pointer
131            ]
132            stream_configs_raw = dpath.get(dict(self.config), path, default=[])
133            stream_configs = (
134                list(stream_configs_raw)
135                if isinstance(stream_configs_raw, list)
136                else [stream_configs_raw]
137            )
138
139            if stream_config.default_values:
140                stream_configs.extend(stream_config.default_values)
141
142            all_indexed_streams.append([(i, item) for i, item in enumerate(stream_configs)])
143        return [
144            self._merge_combination(combo)  # type: ignore[arg-type]
145            for combo in product(*all_indexed_streams)
146        ]
147
148    def resolve_components(
149        self, stream_template_config: Dict[str, Any]
150    ) -> Iterable[Dict[str, Any]]:
151        """
152        Resolves components in the stream template configuration by populating values.
153
154        Args:
155            stream_template_config (Dict[str, Any]): Stream template to populate.
156
157        Yields:
158            Dict[str, Any]: Updated configurations with resolved components.
159        """
160        kwargs = {"stream_template_config": stream_template_config}
161
162        for components_values in self._stream_config:
163            updated_config = deepcopy(stream_template_config)
164            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
165
166            for resolved_component in self._resolved_components:
167                if (
168                    resolved_component.condition is not None
169                    and not resolved_component.condition.eval(self.config, **kwargs)
170                ):
171                    continue
172
173                valid_types = (
174                    (resolved_component.value_type,) if resolved_component.value_type else None
175                )
176                value = resolved_component.value.eval(
177                    self.config, valid_types=valid_types, **kwargs
178                )
179
180                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
181                parsed_value = self._parse_yaml_if_possible(value)
182                updated = dpath.set(updated_config, path, parsed_value)
183
184                if parsed_value and not updated and resolved_component.create_or_update:
185                    dpath.new(updated_config, path, parsed_value)
186
187            yield updated_config
188
189    @staticmethod
190    def _parse_yaml_if_possible(value: Any) -> Any:
191        """
192        Try to turn value into a Python object by YAML-parsing it.
193
194        * If value is a `str` and can be parsed by `yaml.safe_load`,
195          return the parsed result.
196        * If parsing fails (`yaml.parser.ParserError`) – or value is not
197          a string at all – return the original value unchanged.
198        """
199        if isinstance(value, str):
200            try:
201                return yaml.safe_load(value)
202            except ParserError:  # "{{ record[0] in ['cohortActiveUsers'] }}"   # not valid YAML
203                return value
204            except ScannerError as e:  # "%Y-%m-%d'   # not valid yaml
205                if "expected alphabetic or numeric character, but found '%'" in str(e):
206                    return value
207                raise e
208        return value

Resolves and populates stream templates with components fetched via source config.

Attributes:
  • stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
  • config (Config): Configuration object for the resolver.
  • components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
  • parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
ConfigComponentsResolver( stream_configs: List[StreamConfig], config: Mapping[str, Any], components_mapping: List[ComponentMappingDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
stream_configs: List[StreamConfig]
config: Mapping[str, Any]
components_mapping: List[ComponentMappingDefinition]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
148    def resolve_components(
149        self, stream_template_config: Dict[str, Any]
150    ) -> Iterable[Dict[str, Any]]:
151        """
152        Resolves components in the stream template configuration by populating values.
153
154        Args:
155            stream_template_config (Dict[str, Any]): Stream template to populate.
156
157        Yields:
158            Dict[str, Any]: Updated configurations with resolved components.
159        """
160        kwargs = {"stream_template_config": stream_template_config}
161
162        for components_values in self._stream_config:
163            updated_config = deepcopy(stream_template_config)
164            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
165
166            for resolved_component in self._resolved_components:
167                if (
168                    resolved_component.condition is not None
169                    and not resolved_component.condition.eval(self.config, **kwargs)
170                ):
171                    continue
172
173                valid_types = (
174                    (resolved_component.value_type,) if resolved_component.value_type else None
175                )
176                value = resolved_component.value.eval(
177                    self.config, valid_types=valid_types, **kwargs
178                )
179
180                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
181                parsed_value = self._parse_yaml_if_possible(value)
182                updated = dpath.set(updated_config, path, parsed_value)
183
184                if parsed_value and not updated and resolved_component.create_or_update:
185                    dpath.new(updated_config, path, parsed_value)
186
187            yield updated_config

Resolves components in the stream template configuration by populating values.

Arguments:
  • stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:

Dict[str, Any]: Updated configurations with resolved components.

COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[pydantic.v1.main.BaseModel]] = {'HttpComponentsResolver': <class 'airbyte_cdk.sources.declarative.models.declarative_component_schema.HttpComponentsResolver'>, 'ConfigComponentsResolver': <class 'airbyte_cdk.sources.declarative.models.declarative_component_schema.ConfigComponentsResolver'>, 'ParametrizedComponentsResolver': <class 'airbyte_cdk.sources.declarative.models.declarative_component_schema.ParametrizedComponentsResolver'>}
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class ParametrizedComponentsResolver(airbyte_cdk.sources.declarative.resolvers.ComponentsResolver):
 35@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 36@dataclass
 37class ParametrizedComponentsResolver(ComponentsResolver):
 38    """
 39    Resolves and populates dynamic streams from defined parametrized values in manifest.
 40    """
 41
 42    stream_parameters: StreamParametersDefinition
 43    config: Config
 44    components_mapping: List[ComponentMappingDefinition]
 45    parameters: InitVar[Mapping[str, Any]]
 46    _resolved_components: List[ResolvedComponentMappingDefinition] = field(
 47        init=False, repr=False, default_factory=list
 48    )
 49
 50    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 51        """
 52        Initializes and parses component mappings, converting them to resolved definitions.
 53
 54        Args:
 55            parameters (Mapping[str, Any]): Parameters for interpolation.
 56        """
 57
 58        for component_mapping in self.components_mapping:
 59            if isinstance(component_mapping.value, (str, InterpolatedString)):
 60                interpolated_value = (
 61                    InterpolatedString.create(component_mapping.value, parameters=parameters)
 62                    if isinstance(component_mapping.value, str)
 63                    else component_mapping.value
 64                )
 65
 66                field_path = [
 67                    InterpolatedString.create(path, parameters=parameters)
 68                    for path in component_mapping.field_path
 69                ]
 70
 71                self._resolved_components.append(
 72                    ResolvedComponentMappingDefinition(
 73                        field_path=field_path,
 74                        value=interpolated_value,
 75                        value_type=component_mapping.value_type,
 76                        create_or_update=component_mapping.create_or_update,
 77                        parameters=parameters,
 78                    )
 79                )
 80            else:
 81                raise ValueError(
 82                    f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
 83                )
 84
 85    def resolve_components(
 86        self, stream_template_config: Dict[str, Any]
 87    ) -> Iterable[Dict[str, Any]]:
 88        kwargs = {"stream_template_config": stream_template_config}
 89
 90        for components_values in self.stream_parameters.list_of_parameters_for_stream:
 91            updated_config = deepcopy(stream_template_config)
 92            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 93            for resolved_component in self._resolved_components:
 94                valid_types = (
 95                    (resolved_component.value_type,) if resolved_component.value_type else None
 96                )
 97                value = resolved_component.value.eval(
 98                    self.config, valid_types=valid_types, **kwargs
 99                )
100                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
101                parsed_value = self._parse_yaml_if_possible(value)
102                # https://github.com/dpath-maintainers/dpath-python/blob/master/dpath/__init__.py#L136
103                # dpath.set returns the number of changed elements, 0 when no elements changed
104                updated = dpath.set(updated_config, path, parsed_value)
105
106                if parsed_value and not updated and resolved_component.create_or_update:
107                    dpath.new(updated_config, path, parsed_value)
108
109            yield updated_config
110
111    @staticmethod
112    def _parse_yaml_if_possible(value: Any) -> Any:
113        """
114        Try to turn value into a Python object by YAML-parsing it.
115
116        * If value is a `str` and can be parsed by `yaml.safe_load`,
117          return the parsed result.
118        * If parsing fails (`yaml.parser.ParserError`) – or value is not
119          a string at all – return the original value unchanged.
120        """
121        if isinstance(value, str):
122            try:
123                return yaml.safe_load(value)
124            except ParserError:  # "{{ record[0] in ['cohortActiveUsers'] }}"   # not valid YAML
125                return value
126        return value

Resolves and populates dynamic streams from defined parametrized values in manifest.

ParametrizedComponentsResolver( stream_parameters: StreamParametersDefinition, config: Mapping[str, Any], components_mapping: List[ComponentMappingDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
stream_parameters: StreamParametersDefinition
config: Mapping[str, Any]
components_mapping: List[ComponentMappingDefinition]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
 85    def resolve_components(
 86        self, stream_template_config: Dict[str, Any]
 87    ) -> Iterable[Dict[str, Any]]:
 88        kwargs = {"stream_template_config": stream_template_config}
 89
 90        for components_values in self.stream_parameters.list_of_parameters_for_stream:
 91            updated_config = deepcopy(stream_template_config)
 92            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 93            for resolved_component in self._resolved_components:
 94                valid_types = (
 95                    (resolved_component.value_type,) if resolved_component.value_type else None
 96                )
 97                value = resolved_component.value.eval(
 98                    self.config, valid_types=valid_types, **kwargs
 99                )
100                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
101                parsed_value = self._parse_yaml_if_possible(value)
102                # https://github.com/dpath-maintainers/dpath-python/blob/master/dpath/__init__.py#L136
103                # dpath.set returns the number of changed elements, 0 when no elements changed
104                updated = dpath.set(updated_config, path, parsed_value)
105
106                if parsed_value and not updated and resolved_component.create_or_update:
107                    dpath.new(updated_config, path, parsed_value)
108
109            yield updated_config

Maps and populates values into a stream template configuration.

Parameters
  • stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class StreamParametersDefinition:
25@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
26@dataclass
27class StreamParametersDefinition:
28    """
29    Represents a stream parameters definition to set up dynamic streams from defined values in manifest.
30    """
31
32    list_of_parameters_for_stream: List[Dict[str, Any]]

Represents a stream parameters definition to set up dynamic streams from defined values in manifest.

StreamParametersDefinition(list_of_parameters_for_stream: List[Dict[str, Any]])
list_of_parameters_for_stream: List[Dict[str, Any]]