airbyte_cdk.sources.declarative.decoders
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( 6 CompositeRawDecoder, 7 GzipParser, 8 JsonParser, 9 Parser, 10) 11from airbyte_cdk.sources.declarative.decoders.decoder import Decoder 12from airbyte_cdk.sources.declarative.decoders.json_decoder import ( 13 IterableDecoder, 14 JsonDecoder, 15) 16from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder 17from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import ( 18 PaginationDecoderDecorator, 19) 20from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder 21from airbyte_cdk.sources.declarative.decoders.zipfile_decoder import ZipfileDecoder 22 23__all__ = [ 24 "Decoder", 25 "CompositeRawDecoder", 26 "JsonDecoder", 27 "JsonParser", 28 "IterableDecoder", 29 "NoopDecoder", 30 "PaginationDecoderDecorator", 31 "XmlDecoder", 32 "ZipfileDecoder", 33]
15@dataclass 16class Decoder: 17 """ 18 Decoder strategy to transform a requests.Response into a Mapping[str, Any] 19 """ 20 21 @abstractmethod 22 def is_stream_response(self) -> bool: 23 """ 24 Set to True if you'd like to use stream=True option in http requester 25 """ 26 27 @abstractmethod 28 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 29 """ 30 Decodes a requests.Response into a Mapping[str, Any] or an array 31 :param response: the response to decode 32 :return: Generator of Mapping describing the response 33 """
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
21 @abstractmethod 22 def is_stream_response(self) -> bool: 23 """ 24 Set to True if you'd like to use stream=True option in http requester 25 """
Set to True if you'd like to use stream=True option in http requester
27 @abstractmethod 28 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 29 """ 30 Decodes a requests.Response into a Mapping[str, Any] or an array 31 :param response: the response to decode 32 :return: Generator of Mapping describing the response 33 """
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
128class CompositeRawDecoder(Decoder): 129 """ 130 Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE 131 passed response.raw to parser(s). 132 133 Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively. 134 135 Example: 136 composite_raw_decoder = CompositeRawDecoder( 137 parser=GzipParser( 138 inner_parser=JsonLineParser(encoding="iso-8859-1") 139 ) 140 ) 141 """ 142 143 def __init__( 144 self, 145 parser: Parser, 146 stream_response: bool = True, 147 parsers_by_header: PARSERS_BY_HEADER_TYPE = None, 148 ) -> None: 149 # since we moved from using `dataclass` to `__init__` method, 150 # we need to keep using the `parser` to be able to resolve the depenencies 151 # between the parsers correctly. 152 self.parser = parser 153 154 self._parsers_by_header = parsers_by_header if parsers_by_header else {} 155 self._stream_response = stream_response 156 157 @classmethod 158 def by_headers( 159 cls, 160 parsers: PARSERS_TYPE, 161 stream_response: bool, 162 fallback_parser: Parser, 163 ) -> "CompositeRawDecoder": 164 """ 165 Create a CompositeRawDecoder instance based on header values. 166 167 Args: 168 parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser. 169 stream_response (bool): A flag indicating whether the response should be streamed. 170 fallback_parser (Parser): A parser to use if no matching header is found. 171 172 Returns: 173 CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers. 174 """ 175 parsers_by_header = {} 176 for headers, header_values, parser in parsers: 177 for header in headers: 178 parsers_by_header[header] = {header_value: parser for header_value in header_values} 179 return cls(fallback_parser, stream_response, parsers_by_header) 180 181 def is_stream_response(self) -> bool: 182 return self._stream_response 183 184 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 185 parser = self._select_parser(response) 186 if self.is_stream_response(): 187 # urllib mentions that some interfaces don't play nice with auto_close 188 # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content 189 # We have indeed observed some issues with CSV parsing. 190 # Hence, we will manage the closing of the file ourselves until we find a better solution. 191 response.raw.auto_close = False 192 yield from parser.parse( 193 data=response.raw, # type: ignore[arg-type] 194 ) 195 response.raw.close() 196 else: 197 yield from parser.parse(data=io.BytesIO(response.content)) 198 199 def _select_parser(self, response: requests.Response) -> Parser: 200 """ 201 Selects the appropriate parser based on the response headers. 202 203 This method iterates through the `_parsers_by_header` dictionary to find a matching parser 204 based on the headers in the response. If a matching header and header value are found, 205 the corresponding parser is returned. If no match is found, the default parser is returned. 206 207 Args: 208 response (requests.Response): The HTTP response object containing headers to check. 209 210 Returns: 211 Parser: The parser corresponding to the matched header value, or the default parser if no match is found. 212 """ 213 for header, parser_by_header_value in self._parsers_by_header.items(): 214 if ( 215 header in response.headers 216 and response.headers[header] in parser_by_header_value.keys() 217 ): 218 return parser_by_header_value[response.headers[header]] 219 return self.parser
Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE passed response.raw to parser(s).
Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively.
Example:
composite_raw_decoder = CompositeRawDecoder( parser=GzipParser( inner_parser=JsonLineParser(encoding="iso-8859-1") ) )
143 def __init__( 144 self, 145 parser: Parser, 146 stream_response: bool = True, 147 parsers_by_header: PARSERS_BY_HEADER_TYPE = None, 148 ) -> None: 149 # since we moved from using `dataclass` to `__init__` method, 150 # we need to keep using the `parser` to be able to resolve the depenencies 151 # between the parsers correctly. 152 self.parser = parser 153 154 self._parsers_by_header = parsers_by_header if parsers_by_header else {} 155 self._stream_response = stream_response
157 @classmethod 158 def by_headers( 159 cls, 160 parsers: PARSERS_TYPE, 161 stream_response: bool, 162 fallback_parser: Parser, 163 ) -> "CompositeRawDecoder": 164 """ 165 Create a CompositeRawDecoder instance based on header values. 166 167 Args: 168 parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser. 169 stream_response (bool): A flag indicating whether the response should be streamed. 170 fallback_parser (Parser): A parser to use if no matching header is found. 171 172 Returns: 173 CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers. 174 """ 175 parsers_by_header = {} 176 for headers, header_values, parser in parsers: 177 for header in headers: 178 parsers_by_header[header] = {header_value: parser for header_value in header_values} 179 return cls(fallback_parser, stream_response, parsers_by_header)
Create a CompositeRawDecoder instance based on header values.
Arguments:
- parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser.
- stream_response (bool): A flag indicating whether the response should be streamed.
- fallback_parser (Parser): A parser to use if no matching header is found.
Returns:
CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers.
Set to True if you'd like to use stream=True option in http requester
184 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 185 parser = self._select_parser(response) 186 if self.is_stream_response(): 187 # urllib mentions that some interfaces don't play nice with auto_close 188 # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content 189 # We have indeed observed some issues with CSV parsing. 190 # Hence, we will manage the closing of the file ourselves until we find a better solution. 191 response.raw.auto_close = False 192 yield from parser.parse( 193 data=response.raw, # type: ignore[arg-type] 194 ) 195 response.raw.close() 196 else: 197 yield from parser.parse(data=io.BytesIO(response.content))
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
20class JsonDecoder(Decoder): 21 """ 22 Decoder strategy that returns the json-encoded content of a response, if any. 23 24 Usually, we would try to instantiate the equivalent `CompositeRawDecoder(parser=JsonParser(), stream_response=False)` but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors. 25 """ 26 27 def __init__(self, parameters: Mapping[str, Any]): 28 self._decoder = CompositeRawDecoder(parser=JsonParser(), stream_response=False) 29 30 def is_stream_response(self) -> bool: 31 return self._decoder.is_stream_response() 32 33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 """ 37 Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping. 38 """ 39 has_yielded = False 40 try: 41 for element in self._decoder.decode(response): 42 yield element 43 has_yielded = True 44 except Exception: 45 yield {} 46 47 if not has_yielded: 48 yield {}
Decoder strategy that returns the json-encoded content of a response, if any.
Usually, we would try to instantiate the equivalent CompositeRawDecoder(parser=JsonParser(), stream_response=False)
but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors.
Set to True if you'd like to use stream=True option in http requester
33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 """ 37 Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping. 38 """ 39 has_yielded = False 40 try: 41 for element in self._decoder.decode(response): 42 yield element 43 has_yielded = True 44 except Exception: 45 yield {} 46 47 if not has_yielded: 48 yield {}
Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
50@dataclass 51class JsonParser(Parser): 52 encoding: str = "utf-8" 53 54 def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: 55 """ 56 Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. 57 """ 58 raw_data = data.read() 59 body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data) 60 61 if body_json is None: 62 raise AirbyteTracedException( 63 message="Response JSON data failed to be parsed. See logs for more information.", 64 internal_message=f"Response JSON data failed to be parsed.", 65 failure_type=FailureType.system_error, 66 ) 67 68 if isinstance(body_json, list): 69 yield from body_json 70 else: 71 yield from [body_json] 72 73 def _parse_orjson(self, raw_data: bytes) -> Optional[Any]: 74 try: 75 return orjson.loads(raw_data.decode(self.encoding)) 76 except Exception as exc: 77 logger.debug( 78 f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}" 79 ) 80 return None 81 82 def _parse_json(self, raw_data: bytes) -> Optional[Any]: 83 try: 84 return json.loads(raw_data.decode(self.encoding)) 85 except Exception as exc: 86 logger.error(f"Failed to parse JSON data using json library. {exc}") 87 return None
54 def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: 55 """ 56 Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. 57 """ 58 raw_data = data.read() 59 body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data) 60 61 if body_json is None: 62 raise AirbyteTracedException( 63 message="Response JSON data failed to be parsed. See logs for more information.", 64 internal_message=f"Response JSON data failed to be parsed.", 65 failure_type=FailureType.system_error, 66 ) 67 68 if isinstance(body_json, list): 69 yield from body_json 70 else: 71 yield from [body_json]
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
51@dataclass 52class IterableDecoder(Decoder): 53 """ 54 Decoder strategy that returns the string content of the response, if any. 55 """ 56 57 parameters: InitVar[Mapping[str, Any]] 58 59 def is_stream_response(self) -> bool: 60 return True 61 62 def decode( 63 self, response: requests.Response 64 ) -> Generator[MutableMapping[str, Any], None, None]: 65 for line in response.iter_lines(): 66 yield {"record": line.decode()}
Decoder strategy that returns the string content of the response, if any.
Set to True if you'd like to use stream=True option in http requester
62 def decode( 63 self, response: requests.Response 64 ) -> Generator[MutableMapping[str, Any], None, None]: 65 for line in response.iter_lines(): 66 yield {"record": line.decode()}
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
14class NoopDecoder(Decoder): 15 def is_stream_response(self) -> bool: 16 return False 17 18 def decode( # type: ignore[override] # Signature doesn't match base class 19 self, 20 response: requests.Response, 21 ) -> Generator[Mapping[str, Any], None, None]: 22 yield from [{}]
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
Set to True if you'd like to use stream=True option in http requester
18 def decode( # type: ignore[override] # Signature doesn't match base class 19 self, 20 response: requests.Response, 21 ) -> Generator[Mapping[str, Any], None, None]: 22 yield from [{}]
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
17@dataclass 18class PaginationDecoderDecorator(Decoder): 19 """ 20 Decoder to wrap other decoders when instantiating a DefaultPaginator in order to bypass decoding if the response is streamed. 21 """ 22 23 def __init__(self, decoder: Decoder): 24 self._decoder = decoder 25 26 @property 27 def decoder(self) -> Decoder: 28 return self._decoder 29 30 def is_stream_response(self) -> bool: 31 return self._decoder.is_stream_response() 32 33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 if self._decoder.is_stream_response(): 37 logger.warning("Response is streamed and therefore will not be decoded for pagination.") 38 yield {} 39 else: 40 yield from self._decoder.decode(response)
Decoder to wrap other decoders when instantiating a DefaultPaginator in order to bypass decoding if the response is streamed.
Set to True if you'd like to use stream=True option in http requester
33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 if self._decoder.is_stream_response(): 37 logger.warning("Response is streamed and therefore will not be decoded for pagination.") 38 yield {} 39 else: 40 yield from self._decoder.decode(response)
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
19@dataclass 20class XmlDecoder(Decoder): 21 """ 22 XmlDecoder is a decoder strategy that parses the XML content of the resopnse, and converts it to a dict. 23 24 This class handles XML attributes by prefixing them with an '@' symbol and represents XML text content by using the '#text' key if the element has attributes or the element name/tag. It does not currently support XML namespace declarations. 25 26 Example XML Input: 27 <root> 28 <location id="123"> 29 San Francisco 30 </location> 31 <item id="1" category="books"> 32 <name>Book Title 1</name> 33 <price>10.99</price> 34 </item> 35 <item id="2" category="electronics"> 36 <name>Gadget</name> 37 <price>299.99</price> 38 <description>A useful gadget</description> 39 </item> 40 </root> 41 42 Converted Output: 43 { 44 "root": { 45 "location: { 46 "@id": "123, 47 "#text": "San Francisco" 48 }, 49 "item": [ 50 { 51 "@id": "1", 52 "@category": "books", 53 "name": "Book Title 1", 54 "price": "10.99" 55 }, 56 { 57 "@id": "2", 58 "@category": "electronics", 59 "name": "Gadget", 60 "price": "299.99", 61 "description": "A useful gadget" 62 } 63 ] 64 } 65 } 66 67 Notes: 68 - Attributes of an XML element are prefixed with an '@' symbol in the dictionary output. 69 - Text content of an XML element is handled in two different ways, depending on whether 70 the element has attributes. 71 - If the element has attributes, the text content will be 72 represented by the "#text" key. 73 - If the element does not have any attributes, the text content will be 74 represented by element name. 75 - Namespace declarations are not supported in the current implementation. 76 """ 77 78 parameters: InitVar[Mapping[str, Any]] 79 80 def is_stream_response(self) -> bool: 81 return False 82 83 def decode( 84 self, response: requests.Response 85 ) -> Generator[MutableMapping[str, Any], None, None]: 86 body_xml = response.text 87 try: 88 body_json = xmltodict.parse(body_xml) 89 if not isinstance(body_json, list): 90 body_json = [body_json] 91 if len(body_json) == 0: 92 yield {} 93 else: 94 yield from body_json 95 except ExpatError as exc: 96 logger.warning( 97 f"Response cannot be parsed from XML: {response.status_code=}, {response.text=}, {exc=}" 98 ) 99 yield {}
XmlDecoder is a decoder strategy that parses the XML content of the resopnse, and converts it to a dict.
This class handles XML attributes by prefixing them with an '@' symbol and represents XML text content by using the '#text' key if the element has attributes or the element name/tag. It does not currently support XML namespace declarations.
Example XML Input:
Converted Output: { "root": { "location: { "@id": "123, "#text": "San Francisco" }, "item": [ { "@id": "1", "@category": "books", "name": "Book Title 1", "price": "10.99" }, { "@id": "2", "@category": "electronics", "name": "Gadget", "price": "299.99", "description": "A useful gadget" } ] } }
Notes:
- Attributes of an XML element are prefixed with an '@' symbol in the dictionary output.
- Text content of an XML element is handled in two different ways, depending on whether the element has attributes. - If the element has attributes, the text content will be represented by the "#text" key. - If the element does not have any attributes, the text content will be represented by element name.
- Namespace declarations are not supported in the current implementation.
Set to True if you'd like to use stream=True option in http requester
83 def decode( 84 self, response: requests.Response 85 ) -> Generator[MutableMapping[str, Any], None, None]: 86 body_xml = response.text 87 try: 88 body_json = xmltodict.parse(body_xml) 89 if not isinstance(body_json, list): 90 body_json = [body_json] 91 if len(body_json) == 0: 92 yield {} 93 else: 94 yield from body_json 95 except ExpatError as exc: 96 logger.warning( 97 f"Response cannot be parsed from XML: {response.status_code=}, {response.text=}, {exc=}" 98 ) 99 yield {}
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
22@dataclass 23class ZipfileDecoder(Decoder): 24 parser: Parser 25 26 def is_stream_response(self) -> bool: 27 return False 28 29 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 30 try: 31 with zipfile.ZipFile(BytesIO(response.content)) as zip_file: 32 for file_name in zip_file.namelist(): 33 unzipped_content = zip_file.read(file_name) 34 buffered_content = BytesIO(unzipped_content) 35 try: 36 yield from self.parser.parse( 37 buffered_content, 38 ) 39 except Exception as e: 40 logger.error( 41 f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." 42 ) 43 raise AirbyteTracedException( 44 message=f"Failed to parse file: {file_name} from zip file.", 45 internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.", 46 failure_type=FailureType.system_error, 47 ) from e 48 except zipfile.BadZipFile as e: 49 logger.error( 50 f"Received an invalid zip file in response to URL: {response.request.url}. " 51 f"The size of the response body is: {len(response.content)}" 52 ) 53 raise AirbyteTracedException( 54 message="Received an invalid zip file in response.", 55 internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.", 56 failure_type=FailureType.system_error, 57 ) from e
Set to True if you'd like to use stream=True option in http requester
29 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 30 try: 31 with zipfile.ZipFile(BytesIO(response.content)) as zip_file: 32 for file_name in zip_file.namelist(): 33 unzipped_content = zip_file.read(file_name) 34 buffered_content = BytesIO(unzipped_content) 35 try: 36 yield from self.parser.parse( 37 buffered_content, 38 ) 39 except Exception as e: 40 logger.error( 41 f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." 42 ) 43 raise AirbyteTracedException( 44 message=f"Failed to parse file: {file_name} from zip file.", 45 internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.", 46 failure_type=FailureType.system_error, 47 ) from e 48 except zipfile.BadZipFile as e: 49 logger.error( 50 f"Received an invalid zip file in response to URL: {response.request.url}. " 51 f"The size of the response body is: {len(response.content)}" 52 ) 53 raise AirbyteTracedException( 54 message="Received an invalid zip file in response.", 55 internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.", 56 failure_type=FailureType.system_error, 57 ) from e
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response