airbyte_cdk.sources.declarative.decoders.decoder_parser
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5 6import logging 7from abc import ABC, abstractmethod 8from dataclasses import dataclass 9from io import BufferedIOBase 10from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple 11 12logger = logging.getLogger("airbyte") 13 14 15PARSER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None] 16 17 18@dataclass 19class Parser(ABC): 20 @abstractmethod 21 def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: 22 """ 23 Parse data and yield dictionaries. 24 """ 25 pass 26 27 28# reusable parser types 29PARSERS_TYPE = List[Tuple[Set[str], Set[str], Parser]] 30PARSERS_BY_HEADER_TYPE = Optional[Dict[str, Dict[str, Parser]]]
logger =
<Logger airbyte (INFO)>
PARSER_OUTPUT_TYPE =
typing.Generator[typing.MutableMapping[str, typing.Any], NoneType, NoneType]
@dataclass
class
Parser
PARSERS_TYPE =
typing.List[typing.Tuple[typing.Set[str], typing.Set[str], Parser]]
PARSERS_BY_HEADER_TYPE =
typing.Optional[typing.Dict[str, typing.Dict[str, Parser]]]