airbyte_cdk.sources.declarative.yaml_declarative_source

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5import pkgutil
 6from typing import Any, List, Mapping, Optional
 7
 8import yaml
 9
10from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog
11from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
12    ConcurrentDeclarativeSource,
13)
14from airbyte_cdk.sources.types import ConnectionDefinition
15
16
17class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]):
18    """Declarative source defined by a yaml file"""
19
20    def __init__(
21        self,
22        path_to_yaml: str,
23        debug: bool = False,
24        catalog: Optional[ConfiguredAirbyteCatalog] = None,
25        config: Optional[Mapping[str, Any]] = None,
26        state: Optional[List[AirbyteStateMessage]] = None,
27    ) -> None:
28        """
29        :param path_to_yaml: Path to the yaml file describing the source
30        """
31        self._path_to_yaml = path_to_yaml
32        source_config = self._read_and_parse_yaml_file(path_to_yaml)
33
34        super().__init__(
35            catalog=catalog or ConfiguredAirbyteCatalog(streams=[]),
36            config=config or {},
37            state=state or [],
38            source_config=source_config,
39        )
40
41    def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
42        try:
43            # For testing purposes, we want to allow to just pass a file
44            with open(path_to_yaml_file, "r") as f:
45                return yaml.safe_load(f)  # type: ignore  # we assume the yaml represents a ConnectionDefinition
46        except FileNotFoundError:
47            # Running inside the container, the working directory during an operation is not structured the same as the static files
48            package = self.__class__.__module__.split(".")[0]
49
50            yaml_config = pkgutil.get_data(package, path_to_yaml_file)
51            if yaml_config:
52                decoded_yaml = yaml_config.decode()
53                return self._parse(decoded_yaml)
54            return {}
55
56    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
57        extra_args["path_to_yaml"] = self._path_to_yaml
58
59    @staticmethod
60    def _parse(connection_definition_str: str) -> ConnectionDefinition:
61        """
62        Parses a yaml file into a manifest. Component references still exist in the manifest which will be
63        resolved during the creating of the DeclarativeSource.
64        :param connection_definition_str: yaml string to parse
65        :return: The ConnectionDefinition parsed from connection_definition_str
66        """
67        return yaml.safe_load(connection_definition_str)  # type: ignore # yaml.safe_load doesn't return a type but know it is a Mapping
18class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]):
19    """Declarative source defined by a yaml file"""
20
21    def __init__(
22        self,
23        path_to_yaml: str,
24        debug: bool = False,
25        catalog: Optional[ConfiguredAirbyteCatalog] = None,
26        config: Optional[Mapping[str, Any]] = None,
27        state: Optional[List[AirbyteStateMessage]] = None,
28    ) -> None:
29        """
30        :param path_to_yaml: Path to the yaml file describing the source
31        """
32        self._path_to_yaml = path_to_yaml
33        source_config = self._read_and_parse_yaml_file(path_to_yaml)
34
35        super().__init__(
36            catalog=catalog or ConfiguredAirbyteCatalog(streams=[]),
37            config=config or {},
38            state=state or [],
39            source_config=source_config,
40        )
41
42    def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition:
43        try:
44            # For testing purposes, we want to allow to just pass a file
45            with open(path_to_yaml_file, "r") as f:
46                return yaml.safe_load(f)  # type: ignore  # we assume the yaml represents a ConnectionDefinition
47        except FileNotFoundError:
48            # Running inside the container, the working directory during an operation is not structured the same as the static files
49            package = self.__class__.__module__.split(".")[0]
50
51            yaml_config = pkgutil.get_data(package, path_to_yaml_file)
52            if yaml_config:
53                decoded_yaml = yaml_config.decode()
54                return self._parse(decoded_yaml)
55            return {}
56
57    def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
58        extra_args["path_to_yaml"] = self._path_to_yaml
59
60    @staticmethod
61    def _parse(connection_definition_str: str) -> ConnectionDefinition:
62        """
63        Parses a yaml file into a manifest. Component references still exist in the manifest which will be
64        resolved during the creating of the DeclarativeSource.
65        :param connection_definition_str: yaml string to parse
66        :return: The ConnectionDefinition parsed from connection_definition_str
67        """
68        return yaml.safe_load(connection_definition_str)  # type: ignore # yaml.safe_load doesn't return a type but know it is a Mapping

Declarative source defined by a yaml file

YamlDeclarativeSource( path_to_yaml: str, debug: bool = False, catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None)
21    def __init__(
22        self,
23        path_to_yaml: str,
24        debug: bool = False,
25        catalog: Optional[ConfiguredAirbyteCatalog] = None,
26        config: Optional[Mapping[str, Any]] = None,
27        state: Optional[List[AirbyteStateMessage]] = None,
28    ) -> None:
29        """
30        :param path_to_yaml: Path to the yaml file describing the source
31        """
32        self._path_to_yaml = path_to_yaml
33        source_config = self._read_and_parse_yaml_file(path_to_yaml)
34
35        super().__init__(
36            catalog=catalog or ConfiguredAirbyteCatalog(streams=[]),
37            config=config or {},
38            state=state or [],
39            source_config=source_config,
40        )
Parameters
  • path_to_yaml: Path to the yaml file describing the source