airbyte_cdk.sources.declarative.yaml_declarative_source
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import pkgutil 6from typing import Any, List, Mapping, Optional 7 8import yaml 9 10from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog 11from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( 12 ConcurrentDeclarativeSource, 13) 14from airbyte_cdk.sources.types import ConnectionDefinition 15 16 17class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]): 18 """Declarative source defined by a yaml file""" 19 20 def __init__( 21 self, 22 path_to_yaml: str, 23 debug: bool = False, 24 catalog: Optional[ConfiguredAirbyteCatalog] = None, 25 config: Optional[Mapping[str, Any]] = None, 26 state: Optional[List[AirbyteStateMessage]] = None, 27 ) -> None: 28 """ 29 :param path_to_yaml: Path to the yaml file describing the source 30 """ 31 self._path_to_yaml = path_to_yaml 32 source_config = self._read_and_parse_yaml_file(path_to_yaml) 33 34 super().__init__( 35 catalog=catalog or ConfiguredAirbyteCatalog(streams=[]), 36 config=config or {}, 37 state=state or [], 38 source_config=source_config, 39 ) 40 41 def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: 42 try: 43 # For testing purposes, we want to allow to just pass a file 44 with open(path_to_yaml_file, "r") as f: 45 return yaml.safe_load(f) # type: ignore # we assume the yaml represents a ConnectionDefinition 46 except FileNotFoundError: 47 # Running inside the container, the working directory during an operation is not structured the same as the static files 48 package = self.__class__.__module__.split(".")[0] 49 50 yaml_config = pkgutil.get_data(package, path_to_yaml_file) 51 if yaml_config: 52 decoded_yaml = yaml_config.decode() 53 return self._parse(decoded_yaml) 54 return {} 55 56 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 57 extra_args["path_to_yaml"] = self._path_to_yaml 58 59 @staticmethod 60 def _parse(connection_definition_str: str) -> ConnectionDefinition: 61 """ 62 Parses a yaml file into a manifest. Component references still exist in the manifest which will be 63 resolved during the creating of the DeclarativeSource. 64 :param connection_definition_str: yaml string to parse 65 :return: The ConnectionDefinition parsed from connection_definition_str 66 """ 67 return yaml.safe_load(connection_definition_str) # type: ignore # yaml.safe_load doesn't return a type but know it is a Mapping
class
YamlDeclarativeSource(airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource[typing.List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]]):
18class YamlDeclarativeSource(ConcurrentDeclarativeSource[List[AirbyteStateMessage]]): 19 """Declarative source defined by a yaml file""" 20 21 def __init__( 22 self, 23 path_to_yaml: str, 24 debug: bool = False, 25 catalog: Optional[ConfiguredAirbyteCatalog] = None, 26 config: Optional[Mapping[str, Any]] = None, 27 state: Optional[List[AirbyteStateMessage]] = None, 28 ) -> None: 29 """ 30 :param path_to_yaml: Path to the yaml file describing the source 31 """ 32 self._path_to_yaml = path_to_yaml 33 source_config = self._read_and_parse_yaml_file(path_to_yaml) 34 35 super().__init__( 36 catalog=catalog or ConfiguredAirbyteCatalog(streams=[]), 37 config=config or {}, 38 state=state or [], 39 source_config=source_config, 40 ) 41 42 def _read_and_parse_yaml_file(self, path_to_yaml_file: str) -> ConnectionDefinition: 43 try: 44 # For testing purposes, we want to allow to just pass a file 45 with open(path_to_yaml_file, "r") as f: 46 return yaml.safe_load(f) # type: ignore # we assume the yaml represents a ConnectionDefinition 47 except FileNotFoundError: 48 # Running inside the container, the working directory during an operation is not structured the same as the static files 49 package = self.__class__.__module__.split(".")[0] 50 51 yaml_config = pkgutil.get_data(package, path_to_yaml_file) 52 if yaml_config: 53 decoded_yaml = yaml_config.decode() 54 return self._parse(decoded_yaml) 55 return {} 56 57 def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None: 58 extra_args["path_to_yaml"] = self._path_to_yaml 59 60 @staticmethod 61 def _parse(connection_definition_str: str) -> ConnectionDefinition: 62 """ 63 Parses a yaml file into a manifest. Component references still exist in the manifest which will be 64 resolved during the creating of the DeclarativeSource. 65 :param connection_definition_str: yaml string to parse 66 :return: The ConnectionDefinition parsed from connection_definition_str 67 """ 68 return yaml.safe_load(connection_definition_str) # type: ignore # yaml.safe_load doesn't return a type but know it is a Mapping
Declarative source defined by a yaml file
YamlDeclarativeSource( path_to_yaml: str, debug: bool = False, catalog: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog] = None, config: Optional[Mapping[str, Any]] = None, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None)
21 def __init__( 22 self, 23 path_to_yaml: str, 24 debug: bool = False, 25 catalog: Optional[ConfiguredAirbyteCatalog] = None, 26 config: Optional[Mapping[str, Any]] = None, 27 state: Optional[List[AirbyteStateMessage]] = None, 28 ) -> None: 29 """ 30 :param path_to_yaml: Path to the yaml file describing the source 31 """ 32 self._path_to_yaml = path_to_yaml 33 source_config = self._read_and_parse_yaml_file(path_to_yaml) 34 35 super().__init__( 36 catalog=catalog or ConfiguredAirbyteCatalog(streams=[]), 37 config=config or {}, 38 state=state or [], 39 source_config=source_config, 40 )
Parameters
- path_to_yaml: Path to the yaml file describing the source
Inherited Members
- airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource
- is_partially_declarative
- read
- discover
- streams
- airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource
- logger
- components_module
- resolved_manifest
- message_repository
- connection_checker
- spec
- check