airbyte_cdk.sources.declarative.decoders
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import ( 6 CompositeRawDecoder, 7 GzipParser, 8 JsonParser, 9 Parser, 10) 11from airbyte_cdk.sources.declarative.decoders.decoder import Decoder 12from airbyte_cdk.sources.declarative.decoders.json_decoder import ( 13 IterableDecoder, 14 JsonDecoder, 15) 16from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder 17from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import ( 18 PaginationDecoderDecorator, 19) 20from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder 21from airbyte_cdk.sources.declarative.decoders.zipfile_decoder import ZipfileDecoder 22 23__all__ = [ 24 "Decoder", 25 "CompositeRawDecoder", 26 "JsonDecoder", 27 "JsonParser", 28 "IterableDecoder", 29 "NoopDecoder", 30 "PaginationDecoderDecorator", 31 "XmlDecoder", 32 "ZipfileDecoder", 33]
15@dataclass 16class Decoder: 17 """ 18 Decoder strategy to transform a requests.Response into a Mapping[str, Any] 19 """ 20 21 @abstractmethod 22 def is_stream_response(self) -> bool: 23 """ 24 Set to True if you'd like to use stream=True option in http requester 25 """ 26 27 @abstractmethod 28 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 29 """ 30 Decodes a requests.Response into a Mapping[str, Any] or an array 31 :param response: the response to decode 32 :return: Generator of Mapping describing the response 33 """
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
21 @abstractmethod 22 def is_stream_response(self) -> bool: 23 """ 24 Set to True if you'd like to use stream=True option in http requester 25 """
Set to True if you'd like to use stream=True option in http requester
27 @abstractmethod 28 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 29 """ 30 Decodes a requests.Response into a Mapping[str, Any] or an array 31 :param response: the response to decode 32 :return: Generator of Mapping describing the response 33 """
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
187class CompositeRawDecoder(Decoder): 188 """ 189 Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE 190 passed response.raw to parser(s). 191 192 Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively. 193 194 Example: 195 composite_raw_decoder = CompositeRawDecoder( 196 parser=GzipParser( 197 inner_parser=JsonLineParser(encoding="iso-8859-1") 198 ) 199 ) 200 """ 201 202 def __init__( 203 self, 204 parser: Parser, 205 stream_response: bool = True, 206 parsers_by_header: PARSERS_BY_HEADER_TYPE = None, 207 ) -> None: 208 # since we moved from using `dataclass` to `__init__` method, 209 # we need to keep using the `parser` to be able to resolve the depenencies 210 # between the parsers correctly. 211 self.parser = parser 212 213 self._parsers_by_header = parsers_by_header if parsers_by_header else {} 214 self._stream_response = stream_response 215 216 @classmethod 217 def by_headers( 218 cls, 219 parsers: PARSERS_TYPE, 220 stream_response: bool, 221 fallback_parser: Parser, 222 ) -> "CompositeRawDecoder": 223 """ 224 Create a CompositeRawDecoder instance based on header values. 225 226 Args: 227 parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser. 228 stream_response (bool): A flag indicating whether the response should be streamed. 229 fallback_parser (Parser): A parser to use if no matching header is found. 230 231 Returns: 232 CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers. 233 """ 234 parsers_by_header = {} 235 for headers, header_values, parser in parsers: 236 for header in headers: 237 parsers_by_header[header] = {header_value: parser for header_value in header_values} 238 return cls(fallback_parser, stream_response, parsers_by_header) 239 240 def is_stream_response(self) -> bool: 241 return self._stream_response 242 243 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 244 parser = self._select_parser(response) 245 if self.is_stream_response(): 246 # urllib mentions that some interfaces don't play nice with auto_close 247 # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content 248 # We have indeed observed some issues with CSV parsing. 249 # Hence, we will manage the closing of the file ourselves until we find a better solution. 250 response.raw.auto_close = False 251 yield from parser.parse( 252 data=response.raw, # type: ignore[arg-type] 253 ) 254 response.raw.close() 255 else: 256 yield from parser.parse(data=io.BytesIO(response.content)) 257 258 def _select_parser(self, response: requests.Response) -> Parser: 259 """ 260 Selects the appropriate parser based on the response headers. 261 262 This method iterates through the `_parsers_by_header` dictionary to find a matching parser 263 based on the headers in the response. If a matching header and header value are found, 264 the corresponding parser is returned. If no match is found, the default parser is returned. 265 266 Args: 267 response (requests.Response): The HTTP response object containing headers to check. 268 269 Returns: 270 Parser: The parser corresponding to the matched header value, or the default parser if no match is found. 271 """ 272 for header, parser_by_header_value in self._parsers_by_header.items(): 273 if ( 274 header in response.headers 275 and response.headers[header] in parser_by_header_value.keys() 276 ): 277 return parser_by_header_value[response.headers[header]] 278 return self.parser
Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE passed response.raw to parser(s).
Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively.
Example:
composite_raw_decoder = CompositeRawDecoder( parser=GzipParser( inner_parser=JsonLineParser(encoding="iso-8859-1") ) )
202 def __init__( 203 self, 204 parser: Parser, 205 stream_response: bool = True, 206 parsers_by_header: PARSERS_BY_HEADER_TYPE = None, 207 ) -> None: 208 # since we moved from using `dataclass` to `__init__` method, 209 # we need to keep using the `parser` to be able to resolve the depenencies 210 # between the parsers correctly. 211 self.parser = parser 212 213 self._parsers_by_header = parsers_by_header if parsers_by_header else {} 214 self._stream_response = stream_response
216 @classmethod 217 def by_headers( 218 cls, 219 parsers: PARSERS_TYPE, 220 stream_response: bool, 221 fallback_parser: Parser, 222 ) -> "CompositeRawDecoder": 223 """ 224 Create a CompositeRawDecoder instance based on header values. 225 226 Args: 227 parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser. 228 stream_response (bool): A flag indicating whether the response should be streamed. 229 fallback_parser (Parser): A parser to use if no matching header is found. 230 231 Returns: 232 CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers. 233 """ 234 parsers_by_header = {} 235 for headers, header_values, parser in parsers: 236 for header in headers: 237 parsers_by_header[header] = {header_value: parser for header_value in header_values} 238 return cls(fallback_parser, stream_response, parsers_by_header)
Create a CompositeRawDecoder instance based on header values.
Arguments:
- parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser.
- stream_response (bool): A flag indicating whether the response should be streamed.
- fallback_parser (Parser): A parser to use if no matching header is found.
Returns:
CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers.
Set to True if you'd like to use stream=True option in http requester
243 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 244 parser = self._select_parser(response) 245 if self.is_stream_response(): 246 # urllib mentions that some interfaces don't play nice with auto_close 247 # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content 248 # We have indeed observed some issues with CSV parsing. 249 # Hence, we will manage the closing of the file ourselves until we find a better solution. 250 response.raw.auto_close = False 251 yield from parser.parse( 252 data=response.raw, # type: ignore[arg-type] 253 ) 254 response.raw.close() 255 else: 256 yield from parser.parse(data=io.BytesIO(response.content))
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
20class JsonDecoder(Decoder): 21 """ 22 Decoder strategy that returns the json-encoded content of a response, if any. 23 24 Usually, we would try to instantiate the equivalent `CompositeRawDecoder(parser=JsonParser(), stream_response=False)` but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors. 25 """ 26 27 def __init__(self, parameters: Mapping[str, Any]): 28 self._decoder = CompositeRawDecoder(parser=JsonParser(), stream_response=False) 29 30 def is_stream_response(self) -> bool: 31 return self._decoder.is_stream_response() 32 33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 """ 37 Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping. 38 """ 39 has_yielded = False 40 try: 41 for element in self._decoder.decode(response): 42 yield element 43 has_yielded = True 44 except Exception: 45 yield {} 46 47 if not has_yielded: 48 yield {}
Decoder strategy that returns the json-encoded content of a response, if any.
Usually, we would try to instantiate the equivalent CompositeRawDecoder(parser=JsonParser(), stream_response=False) but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors.
Set to True if you'd like to use stream=True option in http requester
33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 """ 37 Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping. 38 """ 39 has_yielded = False 40 try: 41 for element in self._decoder.decode(response): 42 yield element 43 has_yielded = True 44 except Exception: 45 yield {} 46 47 if not has_yielded: 48 yield {}
Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
52@dataclass 53class JsonParser(Parser): 54 encoding: str = "utf-8" 55 56 def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: 57 """ 58 Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. 59 """ 60 raw_data = data.read() 61 body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data) 62 63 if body_json is None: 64 raise AirbyteTracedException( 65 message="Response JSON data failed to be parsed. See logs for more information.", 66 internal_message=f"Response JSON data failed to be parsed.", 67 failure_type=FailureType.system_error, 68 ) 69 70 if isinstance(body_json, list): 71 yield from body_json 72 else: 73 yield from [body_json] 74 75 def _parse_orjson(self, raw_data: bytes) -> Optional[Any]: 76 try: 77 return orjson.loads(raw_data.decode(self.encoding)) 78 except Exception as exc: 79 logger.debug( 80 f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}" 81 ) 82 return None 83 84 def _parse_json(self, raw_data: bytes) -> Optional[Any]: 85 try: 86 return json.loads(raw_data.decode(self.encoding)) 87 except Exception as exc: 88 logger.error(f"Failed to parse JSON data using json library. {exc}") 89 return None
56 def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE: 57 """ 58 Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. 59 """ 60 raw_data = data.read() 61 body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data) 62 63 if body_json is None: 64 raise AirbyteTracedException( 65 message="Response JSON data failed to be parsed. See logs for more information.", 66 internal_message=f"Response JSON data failed to be parsed.", 67 failure_type=FailureType.system_error, 68 ) 69 70 if isinstance(body_json, list): 71 yield from body_json 72 else: 73 yield from [body_json]
Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
51@dataclass 52class IterableDecoder(Decoder): 53 """ 54 Decoder strategy that returns the string content of the response, if any. 55 """ 56 57 parameters: InitVar[Mapping[str, Any]] 58 59 def is_stream_response(self) -> bool: 60 return True 61 62 def decode( 63 self, response: requests.Response 64 ) -> Generator[MutableMapping[str, Any], None, None]: 65 for line in response.iter_lines(): 66 yield {"record": line.decode()}
Decoder strategy that returns the string content of the response, if any.
Set to True if you'd like to use stream=True option in http requester
62 def decode( 63 self, response: requests.Response 64 ) -> Generator[MutableMapping[str, Any], None, None]: 65 for line in response.iter_lines(): 66 yield {"record": line.decode()}
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
14class NoopDecoder(Decoder): 15 def is_stream_response(self) -> bool: 16 return False 17 18 def decode( # type: ignore[override] # Signature doesn't match base class 19 self, 20 response: requests.Response, 21 ) -> Generator[Mapping[str, Any], None, None]: 22 yield from [{}]
Decoder strategy to transform a requests.Response into a Mapping[str, Any]
Set to True if you'd like to use stream=True option in http requester
18 def decode( # type: ignore[override] # Signature doesn't match base class 19 self, 20 response: requests.Response, 21 ) -> Generator[Mapping[str, Any], None, None]: 22 yield from [{}]
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
17@dataclass 18class PaginationDecoderDecorator(Decoder): 19 """ 20 Decoder to wrap other decoders when instantiating a DefaultPaginator in order to bypass decoding if the response is streamed. 21 """ 22 23 def __init__(self, decoder: Decoder): 24 self._decoder = decoder 25 26 @property 27 def decoder(self) -> Decoder: 28 return self._decoder 29 30 def is_stream_response(self) -> bool: 31 return self._decoder.is_stream_response() 32 33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 if self._decoder.is_stream_response(): 37 logger.warning("Response is streamed and therefore will not be decoded for pagination.") 38 yield {} 39 else: 40 yield from self._decoder.decode(response)
Decoder to wrap other decoders when instantiating a DefaultPaginator in order to bypass decoding if the response is streamed.
Set to True if you'd like to use stream=True option in http requester
33 def decode( 34 self, response: requests.Response 35 ) -> Generator[MutableMapping[str, Any], None, None]: 36 if self._decoder.is_stream_response(): 37 logger.warning("Response is streamed and therefore will not be decoded for pagination.") 38 yield {} 39 else: 40 yield from self._decoder.decode(response)
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
19@dataclass 20class XmlDecoder(Decoder): 21 """ 22 XmlDecoder is a decoder strategy that parses the XML content of the resopnse, and converts it to a dict. 23 24 This class handles XML attributes by prefixing them with an '@' symbol and represents XML text content by using the '#text' key if the element has attributes or the element name/tag. It does not currently support XML namespace declarations. 25 26 Example XML Input: 27 <root> 28 <location id="123"> 29 San Francisco 30 </location> 31 <item id="1" category="books"> 32 <name>Book Title 1</name> 33 <price>10.99</price> 34 </item> 35 <item id="2" category="electronics"> 36 <name>Gadget</name> 37 <price>299.99</price> 38 <description>A useful gadget</description> 39 </item> 40 </root> 41 42 Converted Output: 43 { 44 "root": { 45 "location: { 46 "@id": "123, 47 "#text": "San Francisco" 48 }, 49 "item": [ 50 { 51 "@id": "1", 52 "@category": "books", 53 "name": "Book Title 1", 54 "price": "10.99" 55 }, 56 { 57 "@id": "2", 58 "@category": "electronics", 59 "name": "Gadget", 60 "price": "299.99", 61 "description": "A useful gadget" 62 } 63 ] 64 } 65 } 66 67 Notes: 68 - Attributes of an XML element are prefixed with an '@' symbol in the dictionary output. 69 - Text content of an XML element is handled in two different ways, depending on whether 70 the element has attributes. 71 - If the element has attributes, the text content will be 72 represented by the "#text" key. 73 - If the element does not have any attributes, the text content will be 74 represented by element name. 75 - Namespace declarations are not supported in the current implementation. 76 """ 77 78 parameters: InitVar[Mapping[str, Any]] 79 80 def is_stream_response(self) -> bool: 81 return False 82 83 def decode( 84 self, response: requests.Response 85 ) -> Generator[MutableMapping[str, Any], None, None]: 86 body_xml = response.text 87 try: 88 body_json = xmltodict.parse(body_xml) 89 if not isinstance(body_json, list): 90 body_json = [body_json] 91 if len(body_json) == 0: 92 yield {} 93 else: 94 yield from body_json 95 except ExpatError as exc: 96 logger.warning( 97 f"Response cannot be parsed from XML: {response.status_code=}, {response.text=}, {exc=}" 98 ) 99 yield {}
XmlDecoder is a decoder strategy that parses the XML content of the resopnse, and converts it to a dict.
This class handles XML attributes by prefixing them with an '@' symbol and represents XML text content by using the '#text' key if the element has attributes or the element name/tag. It does not currently support XML namespace declarations.
Example XML Input:
Converted Output: { "root": { "location: { "@id": "123, "#text": "San Francisco" }, "item": [ { "@id": "1", "@category": "books", "name": "Book Title 1", "price": "10.99" }, { "@id": "2", "@category": "electronics", "name": "Gadget", "price": "299.99", "description": "A useful gadget" } ] } }
Notes:
- Attributes of an XML element are prefixed with an '@' symbol in the dictionary output.
- Text content of an XML element is handled in two different ways, depending on whether the element has attributes. - If the element has attributes, the text content will be represented by the "#text" key. - If the element does not have any attributes, the text content will be represented by element name.
- Namespace declarations are not supported in the current implementation.
Set to True if you'd like to use stream=True option in http requester
83 def decode( 84 self, response: requests.Response 85 ) -> Generator[MutableMapping[str, Any], None, None]: 86 body_xml = response.text 87 try: 88 body_json = xmltodict.parse(body_xml) 89 if not isinstance(body_json, list): 90 body_json = [body_json] 91 if len(body_json) == 0: 92 yield {} 93 else: 94 yield from body_json 95 except ExpatError as exc: 96 logger.warning( 97 f"Response cannot be parsed from XML: {response.status_code=}, {response.text=}, {exc=}" 98 ) 99 yield {}
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response
22@dataclass 23class ZipfileDecoder(Decoder): 24 parser: Parser 25 26 def is_stream_response(self) -> bool: 27 return False 28 29 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 30 try: 31 with zipfile.ZipFile(BytesIO(response.content)) as zip_file: 32 for file_name in zip_file.namelist(): 33 unzipped_content = zip_file.read(file_name) 34 buffered_content = BytesIO(unzipped_content) 35 try: 36 yield from self.parser.parse( 37 buffered_content, 38 ) 39 except Exception as e: 40 logger.error( 41 f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." 42 ) 43 raise AirbyteTracedException( 44 message=f"Failed to parse file: {file_name} from zip file.", 45 internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.", 46 failure_type=FailureType.system_error, 47 ) from e 48 except zipfile.BadZipFile as e: 49 logger.error( 50 f"Received an invalid zip file in response to URL: {response.request.url}. " 51 f"The size of the response body is: {len(response.content)}" 52 ) 53 raise AirbyteTracedException( 54 message="Received an invalid zip file in response.", 55 internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.", 56 failure_type=FailureType.system_error, 57 ) from e
Set to True if you'd like to use stream=True option in http requester
29 def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE: 30 try: 31 with zipfile.ZipFile(BytesIO(response.content)) as zip_file: 32 for file_name in zip_file.namelist(): 33 unzipped_content = zip_file.read(file_name) 34 buffered_content = BytesIO(unzipped_content) 35 try: 36 yield from self.parser.parse( 37 buffered_content, 38 ) 39 except Exception as e: 40 logger.error( 41 f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}." 42 ) 43 raise AirbyteTracedException( 44 message=f"Failed to parse file: {file_name} from zip file.", 45 internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.", 46 failure_type=FailureType.system_error, 47 ) from e 48 except zipfile.BadZipFile as e: 49 logger.error( 50 f"Received an invalid zip file in response to URL: {response.request.url}. " 51 f"The size of the response body is: {len(response.content)}" 52 ) 53 raise AirbyteTracedException( 54 message="Received an invalid zip file in response.", 55 internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.", 56 failure_type=FailureType.system_error, 57 ) from e
Decodes a requests.Response into a Mapping[str, Any] or an array
Parameters
- response: the response to decode
Returns
Generator of Mapping describing the response