airbyte_cdk.sources.declarative.yaml_declarative_source
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import pkgutil 6from typing import Any, List, Mapping, Optional 7 8import yaml 9 10from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog 11from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( 12 ConcurrentDeclarativeSource, 13) 14from airbyte_cdk.sources.types import ConnectionDefinition 15 16 17class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]): 18 """Declarative source defined by a yaml file""" 19 20 def __init__( 21 self, 22 path_to_yaml: str, 23 debug: bool = False, 24 catalog: Optional[ConfiguredAirbyteCatalog] = None, 25 config: Optional[Mapping[str, Any]] = None, 26 state: Optional[List[AirbyteStateMessage]] = None, 27 config_path: Optional[str] = None, 28 ) -> None: 29 """ 30 :param path_to_yaml: Path to the yaml file describing the source 31 """ 32 self._path_to_yaml = path_to_yaml 33 source_config = self._read_and_parse_yaml_file(path_to_yaml) 34 35 super().__init__( 36 catalog=catalog or ConfiguredAirbyteCatalog(streams=[]), 37 config=config or {}, 38 state=state or [], 39 source_config=source_config, 40 config_path=config_path, 41 ) 42 43 def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: 44 try: 45 # For testing purposes, we want to allow to just pass a file 46 with open(path_to_yaml_file, "r") as f: 47 return yaml.safe_load(f) # type: ignore # we assume the yaml represents a ConnectionDefinition 48 except FileNotFoundError: 49 # Running inside the container, the working directory during an operation is not structured the same as the static files 50 package = self.__class__.__module__.split(".")[0] 51 52 yaml_config = pkgutil.get_data(package, path_to_yaml_file) 53 if yaml_config: 54 decoded_yaml = yaml_config.decode() 55 return self._parse(decoded_yaml) 56 return {} 57 58 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 59 extra_args["path_to_yaml"] = self._path_to_yaml 60 61 @staticmethod 62 def _parse(connection_definition_str: str) -> ConnectionDefinition: 63 """ 64 Parses a yaml file into a manifest. Component references still exist in the manifest which will be 65 resolved during the creating of the DeclarativeSource. 66 :param connection_definition_str: yaml string to parse 67 :return: The ConnectionDefinition parsed from connection_definition_str 68 """ 69 return yaml.safe_load(connection_definition_str) # type: ignore # yaml.safe_load doesn't return a type but know it is a Mapping
class
YamlDeclarativeSource(airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource[typing.List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]]):
18class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]): 19 """Declarative source defined by a yaml file""" 20 21 def __init__( 22 self, 23 path_to_yaml: str, 24 debug: bool = False, 25 catalog: Optional[ConfiguredAirbyteCatalog] = None, 26 config: Optional[Mapping[str, Any]] = None, 27 state: Optional[List[AirbyteStateMessage]] = None, 28 config_path: Optional[str] = None, 29 ) -> None: 30 """ 31 :param path_to_yaml: Path to the yaml file describing the source 32 """ 33 self._path_to_yaml = path_to_yaml 34 source_config = self._read_and_parse_yaml_file(path_to_yaml) 35 36 super().__init__( 37 catalog=catalog or ConfiguredAirbyteCatalog(streams=[]), 38 config=config or {}, 39 state=state or [], 40 source_config=source_config, 41 config_path=config_path, 42 ) 43 44 def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: 45 try: 46 # For testing purposes, we want to allow to just pass a file 47 with open(path_to_yaml_file, "r") as f: 48 return yaml.safe_load(f) # type: ignore # we assume the yaml represents a ConnectionDefinition 49 except FileNotFoundError: 50 # Running inside the container, the working directory during an operation is not structured the same as the static files 51 package = self.__class__.__module__.split(".")[0] 52 53 yaml_config = pkgutil.get_data(package, path_to_yaml_file) 54 if yaml_config: 55 decoded_yaml = yaml_config.decode() 56 return self._parse(decoded_yaml) 57 return {} 58 59 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 60 extra_args["path_to_yaml"] = self._path_to_yaml 61 62 @staticmethod 63 def _parse(connection_definition_str: str) -> ConnectionDefinition: 64 """ 65 Parses a yaml file into a manifest. Component references still exist in the manifest which will be 66 resolved during the creating of the DeclarativeSource. 67 :param connection_definition_str: yaml string to parse 68 :return: The ConnectionDefinition parsed from connection_definition_str 69 """ 70 return yaml.safe_load(connection_definition_str) # type: ignore # yaml.safe_load doesn't return a type but know it is a Mapping
Declarative source defined by a yaml file
YamlDeclarativeSource( path_to_yaml: str, debug: bool = False, catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None, config_path: Optional[str] = None)
21 def __init__( 22 self, 23 path_to_yaml: str, 24 debug: bool = False, 25 catalog: Optional[ConfiguredAirbyteCatalog] = None, 26 config: Optional[Mapping[str, Any]] = None, 27 state: Optional[List[AirbyteStateMessage]] = None, 28 config_path: Optional[str] = None, 29 ) -> None: 30 """ 31 :param path_to_yaml: Path to the yaml file describing the source 32 """ 33 self._path_to_yaml = path_to_yaml 34 source_config = self._read_and_parse_yaml_file(path_to_yaml) 35 36 super().__init__( 37 catalog=catalog or ConfiguredAirbyteCatalog(streams=[]), 38 config=config or {}, 39 state=state or [], 40 source_config=source_config, 41 config_path=config_path, 42 )
Parameters
- path_to_yaml: Path to the yaml file describing the source
Inherited Members
- airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource
- is_partially_declarative
- read
- discover
- streams
- airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource
- logger
- components_module
- resolved_manifest
- message_repository
- dynamic_streams
- deprecation_warnings
- connection_checker
- spec
- check