airbyte_cdk.sources.declarative.resolvers

 1#
 2# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
 3#
 4
 5from typing import Mapping
 6
 7from pydantic.v1 import BaseModel
 8
 9from airbyte_cdk.sources.declarative.models import (
10    ConfigComponentsResolver as ConfigComponentsResolverModel,
11)
12from airbyte_cdk.sources.declarative.models import (
13    HttpComponentsResolver as HttpComponentsResolverModel,
14)
15from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
16    ComponentMappingDefinition,
17    ComponentsResolver,
18    ResolvedComponentMappingDefinition,
19)
20from airbyte_cdk.sources.declarative.resolvers.config_components_resolver import (
21    ConfigComponentsResolver,
22    StreamConfig,
23)
24from airbyte_cdk.sources.declarative.resolvers.http_components_resolver import (
25    HttpComponentsResolver,
26)
27
28COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[BaseModel]] = {
29    "HttpComponentsResolver": HttpComponentsResolverModel,
30    "ConfigComponentsResolver": ConfigComponentsResolverModel,
31}
32
33__all__ = [
34    "ComponentsResolver",
35    "HttpComponentsResolver",
36    "ComponentMappingDefinition",
37    "ResolvedComponentMappingDefinition",
38    "StreamConfig",
39    "ConfigComponentsResolver",
40    "COMPONENTS_RESOLVER_TYPE_MAPPING",
41]
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class ComponentsResolver(abc.ABC):
40@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
41@dataclass
42class ComponentsResolver(ABC):
43    """
44    Abstract base class for resolving components in a stream template.
45    """
46
47    @abstractmethod
48    def resolve_components(
49        self, stream_template_config: Dict[str, Any]
50    ) -> Iterable[Dict[str, Any]]:
51        """
52        Maps and populates values into a stream template configuration.
53        :param stream_template_config: The stream template with placeholders for components.
54        :yields: The resolved stream config with populated values.
55        """
56        pass

Abstract base class for resolving components in a stream template.

@abstractmethod
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
47    @abstractmethod
48    def resolve_components(
49        self, stream_template_config: Dict[str, Any]
50    ) -> Iterable[Dict[str, Any]]:
51        """
52        Maps and populates values into a stream template configuration.
53        :param stream_template_config: The stream template with placeholders for components.
54        :yields: The resolved stream config with populated values.
55        """
56        pass

Maps and populates values into a stream template configuration.

Parameters
  • stream_template_config: The stream template with placeholders for components. :yields: The resolved stream config with populated values.
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class HttpComponentsResolver(airbyte_cdk.sources.declarative.resolvers.ComponentsResolver):
 24@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 25@dataclass
 26class HttpComponentsResolver(ComponentsResolver):
 27    """
 28    Resolves and populates stream templates with components fetched via an HTTP retriever.
 29
 30    Attributes:
 31        retriever (Retriever): The retriever used to fetch data from an API.
 32        config (Config): Configuration object for the resolver.
 33        components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
 34        parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
 35    """
 36
 37    retriever: Retriever
 38    config: Config
 39    components_mapping: List[ComponentMappingDefinition]
 40    parameters: InitVar[Mapping[str, Any]]
 41    _resolved_components: List[ResolvedComponentMappingDefinition] = field(
 42        init=False, repr=False, default_factory=list
 43    )
 44
 45    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 46        """
 47        Initializes and parses component mappings, converting them to resolved definitions.
 48
 49        Args:
 50            parameters (Mapping[str, Any]): Parameters for interpolation.
 51        """
 52        for component_mapping in self.components_mapping:
 53            if isinstance(component_mapping.value, (str, InterpolatedString)):
 54                interpolated_value = (
 55                    InterpolatedString.create(component_mapping.value, parameters=parameters)
 56                    if isinstance(component_mapping.value, str)
 57                    else component_mapping.value
 58                )
 59
 60                field_path = [
 61                    InterpolatedString.create(path, parameters=parameters)
 62                    for path in component_mapping.field_path
 63                ]
 64
 65                self._resolved_components.append(
 66                    ResolvedComponentMappingDefinition(
 67                        field_path=field_path,
 68                        value=interpolated_value,
 69                        value_type=component_mapping.value_type,
 70                        parameters=parameters,
 71                    )
 72                )
 73            else:
 74                raise ValueError(
 75                    f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
 76                )
 77
 78    def resolve_components(
 79        self, stream_template_config: Dict[str, Any]
 80    ) -> Iterable[Dict[str, Any]]:
 81        """
 82        Resolves components in the stream template configuration by populating values.
 83
 84        Args:
 85            stream_template_config (Dict[str, Any]): Stream template to populate.
 86
 87        Yields:
 88            Dict[str, Any]: Updated configurations with resolved components.
 89        """
 90        kwargs = {"stream_template_config": stream_template_config}
 91
 92        for stream_slice in self.retriever.stream_slices():
 93            for components_values in self.retriever.read_records(
 94                records_schema={}, stream_slice=stream_slice
 95            ):
 96                updated_config = deepcopy(stream_template_config)
 97                kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 98                kwargs["stream_slice"] = stream_slice  # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]
 99
100                for resolved_component in self._resolved_components:
101                    valid_types = (
102                        (resolved_component.value_type,) if resolved_component.value_type else None
103                    )
104                    value = resolved_component.value.eval(
105                        self.config, valid_types=valid_types, **kwargs
106                    )
107
108                    path = [
109                        path.eval(self.config, **kwargs) for path in resolved_component.field_path
110                    ]
111                    dpath.set(updated_config, path, value)
112
113                yield updated_config

Resolves and populates stream templates with components fetched via an HTTP retriever.

Attributes:
  • retriever (Retriever): The retriever used to fetch data from an API.
  • config (Config): Configuration object for the resolver.
  • components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
  • parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
HttpComponentsResolver( retriever: airbyte_cdk.sources.declarative.retrievers.Retriever, config: Mapping[str, Any], components_mapping: List[ComponentMappingDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
config: Mapping[str, Any]
components_mapping: List[ComponentMappingDefinition]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
 78    def resolve_components(
 79        self, stream_template_config: Dict[str, Any]
 80    ) -> Iterable[Dict[str, Any]]:
 81        """
 82        Resolves components in the stream template configuration by populating values.
 83
 84        Args:
 85            stream_template_config (Dict[str, Any]): Stream template to populate.
 86
 87        Yields:
 88            Dict[str, Any]: Updated configurations with resolved components.
 89        """
 90        kwargs = {"stream_template_config": stream_template_config}
 91
 92        for stream_slice in self.retriever.stream_slices():
 93            for components_values in self.retriever.read_records(
 94                records_schema={}, stream_slice=stream_slice
 95            ):
 96                updated_config = deepcopy(stream_template_config)
 97                kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
 98                kwargs["stream_slice"] = stream_slice  # type: ignore[assignment] # stream_slice will always be of type Mapping[str, Any]
 99
100                for resolved_component in self._resolved_components:
101                    valid_types = (
102                        (resolved_component.value_type,) if resolved_component.value_type else None
103                    )
104                    value = resolved_component.value.eval(
105                        self.config, valid_types=valid_types, **kwargs
106                    )
107
108                    path = [
109                        path.eval(self.config, **kwargs) for path in resolved_component.field_path
110                    ]
111                    dpath.set(updated_config, path, value)
112
113                yield updated_config

Resolves components in the stream template configuration by populating values.

Arguments:
  • stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:

Dict[str, Any]: Updated configurations with resolved components.

@dataclass(frozen=True)
class ComponentMappingDefinition:
16@dataclass(frozen=True)
17class ComponentMappingDefinition:
18    """Defines the configuration for mapping a component in a stream. This class specifies
19    what field in the stream template should be updated with value, supporting dynamic interpolation
20    and type enforcement."""
21
22    field_path: List["InterpolatedString"]
23    value: Union["InterpolatedString", str]
24    value_type: Optional[Type[Any]]
25    parameters: InitVar[Mapping[str, Any]]

Defines the configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.

ComponentMappingDefinition( field_path: List[airbyte_cdk.InterpolatedString], value: Union[airbyte_cdk.InterpolatedString, str], value_type: Optional[Type[Any]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
field_path: List[airbyte_cdk.InterpolatedString]
value: Union[airbyte_cdk.InterpolatedString, str]
value_type: Optional[Type[Any]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
@dataclass(frozen=True)
class ResolvedComponentMappingDefinition:
28@dataclass(frozen=True)
29class ResolvedComponentMappingDefinition:
30    """Defines resolved configuration for mapping a component in a stream. This class specifies
31    what field in the stream template should be updated with value, supporting dynamic interpolation
32    and type enforcement."""
33
34    field_path: List["InterpolatedString"]
35    value: "InterpolatedString"
36    value_type: Optional[Type[Any]]
37    parameters: InitVar[Mapping[str, Any]]

Defines resolved configuration for mapping a component in a stream. This class specifies what field in the stream template should be updated with value, supporting dynamic interpolation and type enforcement.

ResolvedComponentMappingDefinition( field_path: List[airbyte_cdk.InterpolatedString], value: airbyte_cdk.InterpolatedString, value_type: Optional[Type[Any]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
field_path: List[airbyte_cdk.InterpolatedString]
value_type: Optional[Type[Any]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class StreamConfig:
23@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
24@dataclass
25class StreamConfig:
26    """
27    Identifies stream config details for dynamic schema extraction and processing.
28    """
29
30    configs_pointer: List[Union[InterpolatedString, str]]
31    parameters: InitVar[Mapping[str, Any]]
32
33    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
34        self.configs_pointer = [
35            InterpolatedString.create(path, parameters=parameters) for path in self.configs_pointer
36        ]

Identifies stream config details for dynamic schema extraction and processing.

StreamConfig( configs_pointer: List[Union[airbyte_cdk.InterpolatedString, str]], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
configs_pointer: List[Union[airbyte_cdk.InterpolatedString, str]]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
@deprecated('This class is experimental. Use at your own risk.', category=ExperimentalClassWarning)
@dataclass
class ConfigComponentsResolver(airbyte_cdk.sources.declarative.resolvers.ComponentsResolver):
 39@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
 40@dataclass
 41class ConfigComponentsResolver(ComponentsResolver):
 42    """
 43    Resolves and populates stream templates with components fetched via source config.
 44
 45    Attributes:
 46        stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
 47        config (Config): Configuration object for the resolver.
 48        components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
 49        parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
 50    """
 51
 52    stream_config: StreamConfig
 53    config: Config
 54    components_mapping: List[ComponentMappingDefinition]
 55    parameters: InitVar[Mapping[str, Any]]
 56    _resolved_components: List[ResolvedComponentMappingDefinition] = field(
 57        init=False, repr=False, default_factory=list
 58    )
 59
 60    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 61        """
 62        Initializes and parses component mappings, converting them to resolved definitions.
 63
 64        Args:
 65            parameters (Mapping[str, Any]): Parameters for interpolation.
 66        """
 67
 68        for component_mapping in self.components_mapping:
 69            if isinstance(component_mapping.value, (str, InterpolatedString)):
 70                interpolated_value = (
 71                    InterpolatedString.create(component_mapping.value, parameters=parameters)
 72                    if isinstance(component_mapping.value, str)
 73                    else component_mapping.value
 74                )
 75
 76                field_path = [
 77                    InterpolatedString.create(path, parameters=parameters)
 78                    for path in component_mapping.field_path
 79                ]
 80
 81                self._resolved_components.append(
 82                    ResolvedComponentMappingDefinition(
 83                        field_path=field_path,
 84                        value=interpolated_value,
 85                        value_type=component_mapping.value_type,
 86                        parameters=parameters,
 87                    )
 88                )
 89            else:
 90                raise ValueError(
 91                    f"Expected a string or InterpolatedString for value in mapping: {component_mapping}"
 92                )
 93
 94    @property
 95    def _stream_config(self) -> Iterable[Mapping[str, Any]]:
 96        path = [
 97            node.eval(self.config) if not isinstance(node, str) else node
 98            for node in self.stream_config.configs_pointer
 99        ]
100        stream_config = dpath.get(dict(self.config), path, default=[])
101
102        if not isinstance(stream_config, list):
103            stream_config = [stream_config]
104
105        return stream_config
106
107    def resolve_components(
108        self, stream_template_config: Dict[str, Any]
109    ) -> Iterable[Dict[str, Any]]:
110        """
111        Resolves components in the stream template configuration by populating values.
112
113        Args:
114            stream_template_config (Dict[str, Any]): Stream template to populate.
115
116        Yields:
117            Dict[str, Any]: Updated configurations with resolved components.
118        """
119        kwargs = {"stream_template_config": stream_template_config}
120
121        for components_values in self._stream_config:
122            updated_config = deepcopy(stream_template_config)
123            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
124
125            for resolved_component in self._resolved_components:
126                valid_types = (
127                    (resolved_component.value_type,) if resolved_component.value_type else None
128                )
129                value = resolved_component.value.eval(
130                    self.config, valid_types=valid_types, **kwargs
131                )
132
133                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
134
135                dpath.set(updated_config, path, value)
136
137            yield updated_config

Resolves and populates stream templates with components fetched via source config.

Attributes:
  • stream_config (StreamConfig): The description of stream configuration used to fetch stream config from source config.
  • config (Config): Configuration object for the resolver.
  • components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
  • parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
ConfigComponentsResolver( stream_config: StreamConfig, config: Mapping[str, Any], components_mapping: List[ComponentMappingDefinition], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
stream_config: StreamConfig
config: Mapping[str, Any]
components_mapping: List[ComponentMappingDefinition]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def resolve_components(self, stream_template_config: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
107    def resolve_components(
108        self, stream_template_config: Dict[str, Any]
109    ) -> Iterable[Dict[str, Any]]:
110        """
111        Resolves components in the stream template configuration by populating values.
112
113        Args:
114            stream_template_config (Dict[str, Any]): Stream template to populate.
115
116        Yields:
117            Dict[str, Any]: Updated configurations with resolved components.
118        """
119        kwargs = {"stream_template_config": stream_template_config}
120
121        for components_values in self._stream_config:
122            updated_config = deepcopy(stream_template_config)
123            kwargs["components_values"] = components_values  # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
124
125            for resolved_component in self._resolved_components:
126                valid_types = (
127                    (resolved_component.value_type,) if resolved_component.value_type else None
128                )
129                value = resolved_component.value.eval(
130                    self.config, valid_types=valid_types, **kwargs
131                )
132
133                path = [path.eval(self.config, **kwargs) for path in resolved_component.field_path]
134
135                dpath.set(updated_config, path, value)
136
137            yield updated_config

Resolves components in the stream template configuration by populating values.

Arguments:
  • stream_template_config (Dict[str, Any]): Stream template to populate.
Yields:

Dict[str, Any]: Updated configurations with resolved components.

COMPONENTS_RESOLVER_TYPE_MAPPING: Mapping[str, type[pydantic.v1.main.BaseModel]] = {'HttpComponentsResolver': <class 'airbyte_cdk.sources.declarative.models.declarative_component_schema.HttpComponentsResolver'>, 'ConfigComponentsResolver': <class 'airbyte_cdk.sources.declarative.models.declarative_component_schema.ConfigComponentsResolver'>}