airbyte_cdk.sources.declarative.decoders
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( 6 CompositeRawDecoder, 7 GzipParser, 8 JsonParser, 9 Parser, 10) 11from airbyte_cdk.sources.declarative.decoders.decoder import Decoder 12from airbyte_cdk.sources.declarative.decoders.json_decoder import ( 13 IterableDecoder, 14 JsonDecoder, 15) 16from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder 17from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import ( 18 PaginationDecoderDecorator, 19) 20from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder 21from airbyte_cdk.sources.declarative.decoders.zipfile_decoder import ZipfileDecoder 22 23__all__ = [ 24 "Decoder", 25 "CompositeRawDecoder", 26 "JsonDecoder", 27 "JsonParser", 28 "IterableDecoder", 29 "NoopDecoder", 30 "PaginationDecoderDecorator", 31 "XmlDecoder", 32 "ZipfileDecoder", 33]
15@dataclass 16class Decoder: 17 """ 18 Decoder strategy to transform a requests.Response into a Mapping[str, Any] 19 """ 20 21 @abstractmethod 22 def is_stream_response(self) -> bool: 23 """ 24 Set to True if you'd like to use stream=True option in http requester 25 """ 26 27 @abstractmethod 28 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 29 """ 30 Decodes a requests.Response into a Mapping[str, Any] or an array 31 :param response: the response to decode 32 :return: Generator of Mapping describing the response 33 """
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
21 @abstractmethod 22 def is_stream_response(self) -> bool: 23 """ 24 Set to True if you'd like to use stream=True option in http requester 25 """
Set to True if you'd like to use stream=True option in http requester
27 @abstractmethod 28 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 29 """ 30 Decodes a requests.Response into a Mapping[str, Any] or an array 31 :param response: the response to decode 32 :return: Generator of Mapping describing the response 33 """
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
131class CompositeRawDecoder(Decoder): 132 """ 133 Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE 134 passed response.raw to parser(s). 135 136 Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively. 137 138 Example: 139 composite_raw_decoder = CompositeRawDecoder( 140 parser=GzipParser( 141 inner_parser=JsonLineParser(encoding="iso-8859-1") 142 ) 143 ) 144 """ 145 146 def __init__( 147 self, 148 parser: Parser, 149 stream_response: bool = True, 150 parsers_by_header: PARSERS_BY_HEADER_TYPE = None, 151 ) -> None: 152 # since we moved from using `dataclass` to `__init__` method, 153 # we need to keep using the `parser` to be able to resolve the depenencies 154 # between the parsers correctly. 155 self.parser = parser 156 157 self._parsers_by_header = parsers_by_header if parsers_by_header else {} 158 self._stream_response = stream_response 159 160 @classmethod 161 def by_headers( 162 cls, 163 parsers: PARSERS_TYPE, 164 stream_response: bool, 165 fallback_parser: Parser, 166 ) -> "CompositeRawDecoder": 167 """ 168 Create a CompositeRawDecoder instance based on header values. 169 170 Args: 171 parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser. 172 stream_response (bool): A flag indicating whether the response should be streamed. 173 fallback_parser (Parser): A parser to use if no matching header is found. 174 175 Returns: 176 CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers. 177 """ 178 parsers_by_header = {} 179 for headers, header_values, parser in parsers: 180 for header in headers: 181 parsers_by_header[header] = {header_value: parser for header_value in header_values} 182 return cls(fallback_parser, stream_response, parsers_by_header) 183 184 def is_stream_response(self) -> bool: 185 return self._stream_response 186 187 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 188 parser = self._select_parser(response) 189 if self.is_stream_response(): 190 # urllib mentions that some interfaces don't play nice with auto_close 191 # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content 192 # We have indeed observed some issues with CSV parsing. 193 # Hence, we will manage the closing of the file ourselves until we find a better solution. 194 response.raw.auto_close = False 195 yield from parser.parse( 196 data=response.raw, # type: ignore[arg-type] 197 ) 198 response.raw.close() 199 else: 200 yield from parser.parse(data=io.BytesIO(response.content)) 201 202 def _select_parser(self, response: requests.Response) -> Parser: 203 """ 204 Selects the appropriate parser based on the response headers. 205 206 This method iterates through the `_parsers_by_header` dictionary to find a matching parser 207 based on the headers in the response. If a matching header and header value are found, 208 the corresponding parser is returned. If no match is found, the default parser is returned. 209 210 Args: 211 response (requests.Response): The HTTP response object containing headers to check. 212 213 Returns: 214 Parser: The parser corresponding to the matched header value, or the default parser if no match is found. 215 """ 216 for header, parser_by_header_value in self._parsers_by_header.items(): 217 if ( 218 header in response.headers 219 and response.headers[header] in parser_by_header_value.keys() 220 ): 221 return parser_by_header_value[response.headers[header]] 222 return self.parser
Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE passed response.raw to parser(s).
Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively.
Example:
composite_raw_decoder = CompositeRawDecoder( parser=GzipParser( inner_parser=JsonLineParser(encoding="iso-8859-1") ) )
146 def __init__( 147 self, 148 parser: Parser, 149 stream_response: bool = True, 150 parsers_by_header: PARSERS_BY_HEADER_TYPE = None, 151 ) -> None: 152 # since we moved from using `dataclass` to `__init__` method, 153 # we need to keep using the `parser` to be able to resolve the depenencies 154 # between the parsers correctly. 155 self.parser = parser 156 157 self._parsers_by_header = parsers_by_header if parsers_by_header else {} 158 self._stream_response = stream_response
160 @classmethod 161 def by_headers( 162 cls, 163 parsers: PARSERS_TYPE, 164 stream_response: bool, 165 fallback_parser: Parser, 166 ) -> "CompositeRawDecoder": 167 """ 168 Create a CompositeRawDecoder instance based on header values. 169 170 Args: 171 parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser. 172 stream_response (bool): A flag indicating whether the response should be streamed. 173 fallback_parser (Parser): A parser to use if no matching header is found. 174 175 Returns: 176 CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers. 177 """ 178 parsers_by_header = {} 179 for headers, header_values, parser in parsers: 180 for header in headers: 181 parsers_by_header[header] = {header_value: parser for header_value in header_values} 182 return cls(fallback_parser, stream_response, parsers_by_header)
Create a CompositeRawDecoder instance based on header values.
Arguments:
- parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser.
- stream_response (bool): A flag indicating whether the response should be streamed.
- fallback_parser (Parser): A parser to use if no matching header is found.
Returns:
CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers.
Set to True if you'd like to use stream=True option in http requester
187 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 188 parser = self._select_parser(response) 189 if self.is_stream_response(): 190 # urllib mentions that some interfaces don't play nice with auto_close 191 # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content 192 # We have indeed observed some issues with CSV parsing. 193 # Hence, we will manage the closing of the file ourselves until we find a better solution. 194 response.raw.auto_close = False 195 yield from parser.parse( 196 data=response.raw, # type: ignore[arg-type] 197 ) 198 response.raw.close() 199 else: 200 yield from parser.parse(data=io.BytesIO(response.content))
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
20class JsonDecoder(Decoder): 21 """ 22 Decoder strategy that returns the json-encoded content of a response, if any. 23 24 Usually, we would try to instantiate the equivalent `CompositeRawDecoder(parser=JsonParser(), stream_response=False)` but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors. 25 """ 26 27 def __init__(self, parameters: Mapping[str, Any]): 28 self._decoder = CompositeRawDecoder(parser=JsonParser(), stream_response=False) 29 30 def is_stream_response(self) -> bool: 31 return self._decoder.is_stream_response() 32 33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 """ 37 Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping. 38 """ 39 has_yielded = False 40 try: 41 for element in self._decoder.decode(response): 42 yield element 43 has_yielded = True 44 except Exception: 45 yield {} 46 47 if not has_yielded: 48 yield {}
Decoder strategy that returns the json-encoded content of a response, if any.
Usually, we would try to instantiate the equivalent CompositeRawDecoder(parser=JsonParser(), stream_response=False)
but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors.
Set to True if you'd like to use stream=True option in http requester
33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 """ 37 Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping. 38 """ 39 has_yielded = False 40 try: 41 for element in self._decoder.decode(response): 42 yield element 43 has_yielded = True 44 except Exception: 45 yield {} 46 47 if not has_yielded: 48 yield {}
Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
50@dataclass 51class JsonParser(Parser): 52 encoding: str = "utf-8" 53 54 def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: 55 """ 56 Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. 57 """ 58 raw_data = data.read() 59 body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data) 60 61 if body_json is None: 62 raise AirbyteTracedException( 63 message="Response JSON data failed to be parsed. See logs for more information.", 64 internal_message=f"Response JSON data failed to be parsed.", 65 failure_type=FailureType.system_error, 66 ) 67 68 if isinstance(body_json, list): 69 yield from body_json 70 else: 71 yield from [body_json] 72 73 def _parse_orjson(self, raw_data: bytes) -> Optional[Any]: 74 try: 75 return orjson.loads(raw_data.decode(self.encoding)) 76 except Exception as exc: 77 logger.debug( 78 f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}" 79 ) 80 return None 81 82 def _parse_json(self, raw_data: bytes) -> Optional[Any]: 83 try: 84 return json.loads(raw_data.decode(self.encoding)) 85 except Exception as exc: 86 logger.error(f"Failed to parse JSON data using json library. {exc}") 87 return None
54 def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: 55 """ 56 Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. 57 """ 58 raw_data = data.read() 59 body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data) 60 61 if body_json is None: 62 raise AirbyteTracedException( 63 message="Response JSON data failed to be parsed. See logs for more information.", 64 internal_message=f"Response JSON data failed to be parsed.", 65 failure_type=FailureType.system_error, 66 ) 67 68 if isinstance(body_json, list): 69 yield from body_json 70 else: 71 yield from [body_json]
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
51@dataclass 52class IterableDecoder(Decoder): 53 """ 54 Decoder strategy that returns the string content of the response, if any. 55 """ 56 57 parameters: InitVar[Mapping[str, Any]] 58 59 def is_stream_response(self) -> bool: 60 return True 61 62 def decode( 63 self, response: requests.Response 64 ) -> Generator[MutableMapping[str, Any], None, None]: 65 for line in response.iter_lines(): 66 yield {"record": line.decode()}
Decoder strategy that returns the string content of the response, if any.
Set to True if you'd like to use stream=True option in http requester
62 def decode( 63 self, response: requests.Response 64 ) -> Generator[MutableMapping[str, Any], None, None]: 65 for line in response.iter_lines(): 66 yield {"record": line.decode()}
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
14class NoopDecoder(Decoder): 15 def is_stream_response(self) -> bool: 16 return False 17 18 def decode( # type: ignore[override] # Signature doesn't match base class 19 self, 20 response: requests.Response, 21 ) -> Generator[Mapping[str, Any], None, None]: 22 yield from [{}]
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
Set to True if you'd like to use stream=True option in http requester
18 def decode( # type: ignore[override] # Signature doesn't match base class 19 self, 20 response: requests.Response, 21 ) -> Generator[Mapping[str, Any], None, None]: 22 yield from [{}]
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
17@dataclass 18class PaginationDecoderDecorator(Decoder): 19 """ 20 Decoder to wrap other decoders when instantiating a DefaultPaginator in order to bypass decoding if the response is streamed. 21 """ 22 23 def __init__(self, decoder: Decoder): 24 self._decoder = decoder 25 26 @property 27 def decoder(self) -> Decoder: 28 return self._decoder 29 30 def is_stream_response(self) -> bool: 31 return self._decoder.is_stream_response() 32 33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 if self._decoder.is_stream_response(): 37 logger.warning("Response is streamed and therefore will not be decoded for pagination.") 38 yield {} 39 else: 40 yield from self._decoder.decode(response)
Decoder to wrap other decoders when instantiating a DefaultPaginator in order to bypass decoding if the response is streamed.
Set to True if you'd like to use stream=True option in http requester
33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 if self._decoder.is_stream_response(): 37 logger.warning("Response is streamed and therefore will not be decoded for pagination.") 38 yield {} 39 else: 40 yield from self._decoder.decode(response)
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
19@dataclass 20class XmlDecoder(Decoder): 21 """ 22 XmlDecoder is a decoder strategy that parses the XML content of the resopnse, and converts it to a dict. 23 24 This class handles XML attributes by prefixing them with an '@' symbol and represents XML text content by using the '#text' key if the element has attributes or the element name/tag. It does not currently support XML namespace declarations. 25 26 Example XML Input: 27 <root> 28 <location id="123"> 29 San Francisco 30 </location> 31 <item id="1" category="books"> 32 <name>Book Title 1</name> 33 <price>10.99</price> 34 </item> 35 <item id="2" category="electronics"> 36 <name>Gadget</name> 37 <price>299.99</price> 38 <description>A useful gadget</description> 39 </item> 40 </root> 41 42 Converted Output: 43 { 44 "root": { 45 "location: { 46 "@id": "123, 47 "#text": "San Francisco" 48 }, 49 "item": [ 50 { 51 "@id": "1", 52 "@category": "books", 53 "name": "Book Title 1", 54 "price": "10.99" 55 }, 56 { 57 "@id": "2", 58 "@category": "electronics", 59 "name": "Gadget", 60 "price": "299.99", 61 "description": "A useful gadget" 62 } 63 ] 64 } 65 } 66 67 Notes: 68 - Attributes of an XML element are prefixed with an '@' symbol in the dictionary output. 69 - Text content of an XML element is handled in two different ways, depending on whether 70 the element has attributes. 71 - If the element has attributes, the text content will be 72 represented by the "#text" key. 73 - If the element does not have any attributes, the text content will be 74 represented by element name. 75 - Namespace declarations are not supported in the current implementation. 76 """ 77 78 parameters: InitVar[Mapping[str, Any]] 79 80 def is_stream_response(self) -> bool: 81 return False 82 83 def decode( 84 self, response: requests.Response 85 ) -> Generator[MutableMapping[str, Any], None, None]: 86 body_xml = response.text 87 try: 88 body_json = xmltodict.parse(body_xml) 89 if not isinstance(body_json, list): 90 body_json = [body_json] 91 if len(body_json) == 0: 92 yield {} 93 else: 94 yield from body_json 95 except ExpatError as exc: 96 logger.warning( 97 f"Response cannot be parsed from XML: {response.status_code=}, {response.text=}, {exc=}" 98 ) 99 yield {}
XmlDecoder is a decoder strategy that parses the XML content of the resopnse, and converts it to a dict.
This class handles XML attributes by prefixing them with an '@' symbol and represents XML text content by using the '#text' key if the element has attributes or the element name/tag. It does not currently support XML namespace declarations.
Example XML Input:
Converted Output: { "root": { "location: { "@id": "123, "#text": "San Francisco" }, "item": [ { "@id": "1", "@category": "books", "name": "Book Title 1", "price": "10.99" }, { "@id": "2", "@category": "electronics", "name": "Gadget", "price": "299.99", "description": "A useful gadget" } ] } }
Notes:
- Attributes of an XML element are prefixed with an '@' symbol in the dictionary output.
- Text content of an XML element is handled in two different ways, depending on whether the element has attributes. - If the element has attributes, the text content will be represented by the "#text" key. - If the element does not have any attributes, the text content will be represented by element name.
- Namespace declarations are not supported in the current implementation.
Set to True if you'd like to use stream=True option in http requester
83 def decode( 84 self, response: requests.Response 85 ) -> Generator[MutableMapping[str, Any], None, None]: 86 body_xml = response.text 87 try: 88 body_json = xmltodict.parse(body_xml) 89 if not isinstance(body_json, list): 90 body_json = [body_json] 91 if len(body_json) == 0: 92 yield {} 93 else: 94 yield from body_json 95 except ExpatError as exc: 96 logger.warning( 97 f"Response cannot be parsed from XML: {response.status_code=}, {response.text=}, {exc=}" 98 ) 99 yield {}
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
22@dataclass 23class ZipfileDecoder(Decoder): 24 parser: Parser 25 26 def is_stream_response(self) -> bool: 27 return False 28 29 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 30 try: 31 with zipfile.ZipFile(BytesIO(response.content)) as zip_file: 32 for file_name in zip_file.namelist(): 33 unzipped_content = zip_file.read(file_name) 34 buffered_content = BytesIO(unzipped_content) 35 try: 36 yield from self.parser.parse( 37 buffered_content, 38 ) 39 except Exception as e: 40 logger.error( 41 f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." 42 ) 43 raise AirbyteTracedException( 44 message=f"Failed to parse file: {file_name} from zip file.", 45 internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.", 46 failure_type=FailureType.system_error, 47 ) from e 48 except zipfile.BadZipFile as e: 49 logger.error( 50 f"Received an invalid zip file in response to URL: {response.request.url}. " 51 f"The size of the response body is: {len(response.content)}" 52 ) 53 raise AirbyteTracedException( 54 message="Received an invalid zip file in response.", 55 internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.", 56 failure_type=FailureType.system_error, 57 ) from e
Set to True if you'd like to use stream=True option in http requester
29 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 30 try: 31 with zipfile.ZipFile(BytesIO(response.content)) as zip_file: 32 for file_name in zip_file.namelist(): 33 unzipped_content = zip_file.read(file_name) 34 buffered_content = BytesIO(unzipped_content) 35 try: 36 yield from self.parser.parse( 37 buffered_content, 38 ) 39 except Exception as e: 40 logger.error( 41 f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." 42 ) 43 raise AirbyteTracedException( 44 message=f"Failed to parse file: {file_name} from zip file.", 45 internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.", 46 failure_type=FailureType.system_error, 47 ) from e 48 except zipfile.BadZipFile as e: 49 logger.error( 50 f"Received an invalid zip file in response to URL: {response.request.url}. " 51 f"The size of the response body is: {len(response.content)}" 52 ) 53 raise AirbyteTracedException( 54 message="Received an invalid zip file in response.", 55 internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.", 56 failure_type=FailureType.system_error, 57 ) from e
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response