airbyte_cdk.sources.declarative.decoders.decoder_parser

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5
 6import logging
 7from abc import ABC, abstractmethod
 8from dataclasses import dataclass
 9from io import BufferedIOBase
10from typing import Any, Dict, Generator, List, MutableMapping, Optional, Set, Tuple
11
12logger = logging.getLogger("airbyte")
13
14
15PARSER_OUTPUT_TYPE = Generator[MutableMapping[str, Any], None, None]
16
17
18@dataclass
19class Parser(ABC):
20    @abstractmethod
21    def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
22        """
23        Parse data and yield dictionaries.
24        """
25        pass
26
27
28# reusable parser types
29PARSERS_TYPE = List[Tuple[Set[str], Set[str], Parser]]
30PARSERS_BY_HEADER_TYPE = Optional[Dict[str, Dict[str, Parser]]]
logger = <Logger airbyte (INFO)>
PARSER_OUTPUT_TYPE = typing.Generator[typing.MutableMapping[str, typing.Any], NoneType, NoneType]
@dataclass
class Parser(abc.ABC):
19@dataclass
20class Parser(ABC):
21    @abstractmethod
22    def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
23        """
24        Parse data and yield dictionaries.
25        """
26        pass
@abstractmethod
def parse( self, data: io.BufferedIOBase) -> Generator[MutableMapping[str, Any], NoneType, NoneType]:
21    @abstractmethod
22    def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
23        """
24        Parse data and yield dictionaries.
25        """
26        pass

Parse data and yield dictionaries.

PARSERS_TYPE = typing.List[typing.Tuple[typing.Set[str], typing.Set[str], Parser]]
PARSERS_BY_HEADER_TYPE = typing.Optional[typing.Dict[str, typing.Dict[str, Parser]]]