airbyte_cdk.sources.declarative.yaml_declarative_source

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5import pkgutil
 6from typing import Any, List, Mapping, Optional
 7
 8import yaml
 9
10from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog
11from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
12    ConcurrentDeclarativeSource,
13)
14from airbyte_cdk.sources.types import ConnectionDefinition
15
16
17class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]):
18    """Declarative source defined by a yaml file"""
19
20    def __init__(
21        self,
22        path_to_yaml: str,
23        debug: bool = False,
24        catalog: Optional[ConfiguredAirbyteCatalog] = None,
25        config: Optional[Mapping[str, Any]] = None,
26        state: Optional[List[AirbyteStateMessage]] = None,
27        config_path: Optional[str] = None,
28    ) -> None:
29        """
30        :param path_to_yaml: Path to the yaml file describing the source
31        """
32        self._path_to_yaml = path_to_yaml
33        source_config = self._read_and_parse_yaml_file(path_to_yaml)
34
35        super().__init__(
36            catalog=catalog or ConfiguredAirbyteCatalog(streams=[]),
37            config=config or {},
38            state=state or [],
39            source_config=source_config,
40            config_path=config_path,
41        )
42
43    def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
44        try:
45            # For testing purposes, we want to allow to just pass a file
46            with open(path_to_yaml_file, "r") as f:
47                return yaml.safe_load(f)  # type: ignore  # we assume the yaml represents a ConnectionDefinition
48        except FileNotFoundError:
49            # Running inside the container, the working directory during an operation is not structured the same as the static files
50            package = self.__class__.__module__.split(".")[0]
51
52            yaml_config = pkgutil.get_data(package, path_to_yaml_file)
53            if yaml_config:
54                decoded_yaml = yaml_config.decode()
55                return self._parse(decoded_yaml)
56            return {}
57
58    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
59        extra_args["path_to_yaml"] = self._path_to_yaml
60
61    @staticmethod
62    def _parse(connection_definition_str: str) -> ConnectionDefinition:
63        """
64        Parses a yaml file into a manifest. Component references still exist in the manifest which will be
65        resolved during the creating of the DeclarativeSource.
66        :param connection_definition_str: yaml string to parse
67        :return: The ConnectionDefinition parsed from connection_definition_str
68        """
69        return yaml.safe_load(connection_definition_str)  # type: ignore # yaml.safe_load doesn't return a type but know it is a Mapping
18class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]):
19    """Declarative source defined by a yaml file"""
20
21    def __init__(
22        self,
23        path_to_yaml: str,
24        debug: bool = False,
25        catalog: Optional[ConfiguredAirbyteCatalog] = None,
26        config: Optional[Mapping[str, Any]] = None,
27        state: Optional[List[AirbyteStateMessage]] = None,
28        config_path: Optional[str] = None,
29    ) -> None:
30        """
31        :param path_to_yaml: Path to the yaml file describing the source
32        """
33        self._path_to_yaml = path_to_yaml
34        source_config = self._read_and_parse_yaml_file(path_to_yaml)
35
36        super().__init__(
37            catalog=catalog or ConfiguredAirbyteCatalog(streams=[]),
38            config=config or {},
39            state=state or [],
40            source_config=source_config,
41            config_path=config_path,
42        )
43
44    def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
45        try:
46            # For testing purposes, we want to allow to just pass a file
47            with open(path_to_yaml_file, "r") as f:
48                return yaml.safe_load(f)  # type: ignore  # we assume the yaml represents a ConnectionDefinition
49        except FileNotFoundError:
50            # Running inside the container, the working directory during an operation is not structured the same as the static files
51            package = self.__class__.__module__.split(".")[0]
52
53            yaml_config = pkgutil.get_data(package, path_to_yaml_file)
54            if yaml_config:
55                decoded_yaml = yaml_config.decode()
56                return self._parse(decoded_yaml)
57            return {}
58
59    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
60        extra_args["path_to_yaml"] = self._path_to_yaml
61
62    @staticmethod
63    def _parse(connection_definition_str: str) -> ConnectionDefinition:
64        """
65        Parses a yaml file into a manifest. Component references still exist in the manifest which will be
66        resolved during the creating of the DeclarativeSource.
67        :param connection_definition_str: yaml string to parse
68        :return: The ConnectionDefinition parsed from connection_definition_str
69        """
70        return yaml.safe_load(connection_definition_str)  # type: ignore # yaml.safe_load doesn't return a type but know it is a Mapping

Declarative source defined by a yaml file

YamlDeclarativeSource( path_to_yaml: str, debug: bool = False, catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None, config_path: Optional[str] = None)
21    def __init__(
22        self,
23        path_to_yaml: str,
24        debug: bool = False,
25        catalog: Optional[ConfiguredAirbyteCatalog] = None,
26        config: Optional[Mapping[str, Any]] = None,
27        state: Optional[List[AirbyteStateMessage]] = None,
28        config_path: Optional[str] = None,
29    ) -> None:
30        """
31        :param path_to_yaml: Path to the yaml file describing the source
32        """
33        self._path_to_yaml = path_to_yaml
34        source_config = self._read_and_parse_yaml_file(path_to_yaml)
35
36        super().__init__(
37            catalog=catalog or ConfiguredAirbyteCatalog(streams=[]),
38            config=config or {},
39            state=state or [],
40            source_config=source_config,
41            config_path=config_path,
42        )
Parameters
  • path_to_yaml: Path to the yaml file describing the source