airbyte_cdk.sources.declarative.decoders

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import (
 6    CompositeRawDecoder,
 7    GzipParser,
 8    JsonParser,
 9    Parser,
10)
11from airbyte_cdk.sources.declarative.decoders.decoder import Decoder
12from airbyte_cdk.sources.declarative.decoders.json_decoder import (
13    IterableDecoder,
14    JsonDecoder,
15)
16from airbyte_cdk.sources.declarative.decoders.noop_decoder import NoopDecoder
17from airbyte_cdk.sources.declarative.decoders.pagination_decoder_decorator import (
18    PaginationDecoderDecorator,
19)
20from airbyte_cdk.sources.declarative.decoders.xml_decoder import XmlDecoder
21from airbyte_cdk.sources.declarative.decoders.zipfile_decoder import ZipfileDecoder
22
23__all__ = [
24    "Decoder",
25    "CompositeRawDecoder",
26    "JsonDecoder",
27    "JsonParser",
28    "IterableDecoder",
29    "NoopDecoder",
30    "PaginationDecoderDecorator",
31    "XmlDecoder",
32    "ZipfileDecoder",
33]
@dataclass
class Decoder:
15@dataclass
16class Decoder:
17    """
18    Decoder strategy to transform a requests.Response into a Mapping[str, Any]
19    """
20
21    @abstractmethod
22    def is_stream_response(self) -> bool:
23        """
24        Set to True if you'd like to use stream=True option in http requester
25        """
26
27    @abstractmethod
28    def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
29        """
30        Decodes a requests.Response into a Mapping[str, Any] or an array
31        :param response: the response to decode
32        :return: Generator of Mapping describing the response
33        """

Decoder strategy to transform a requests.Response into a Mapping[str, Any]

@abstractmethod
def is_stream_response(self) -> bool:
21    @abstractmethod
22    def is_stream_response(self) -> bool:
23        """
24        Set to True if you'd like to use stream=True option in http requester
25        """

Set to True if you'd like to use stream=True option in http requester

@abstractmethod
def decode( self, response: requests.models.Response) -> Generator[MutableMapping[str, Any], NoneType, NoneType]:
27    @abstractmethod
28    def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
29        """
30        Decodes a requests.Response into a Mapping[str, Any] or an array
31        :param response: the response to decode
32        :return: Generator of Mapping describing the response
33        """

Decodes a requests.Response into a Mapping[str, Any] or an array

Parameters
  • response: the response to decode
Returns

Generator of Mapping describing the response

class CompositeRawDecoder(airbyte_cdk.sources.declarative.decoders.Decoder):
128class CompositeRawDecoder(Decoder):
129    """
130    Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE
131    passed response.raw to parser(s).
132
133    Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively.
134
135    Example:
136        composite_raw_decoder = CompositeRawDecoder(
137            parser=GzipParser(
138                inner_parser=JsonLineParser(encoding="iso-8859-1")
139            )
140        )
141    """
142
143    def __init__(
144        self,
145        parser: Parser,
146        stream_response: bool = True,
147        parsers_by_header: PARSERS_BY_HEADER_TYPE = None,
148    ) -> None:
149        # since we moved from using `dataclass` to `__init__` method,
150        # we need to keep using the `parser` to be able to resolve the depenencies
151        # between the parsers correctly.
152        self.parser = parser
153
154        self._parsers_by_header = parsers_by_header if parsers_by_header else {}
155        self._stream_response = stream_response
156
157    @classmethod
158    def by_headers(
159        cls,
160        parsers: PARSERS_TYPE,
161        stream_response: bool,
162        fallback_parser: Parser,
163    ) -> "CompositeRawDecoder":
164        """
165        Create a CompositeRawDecoder instance based on header values.
166
167        Args:
168            parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser.
169            stream_response (bool): A flag indicating whether the response should be streamed.
170            fallback_parser (Parser): A parser to use if no matching header is found.
171
172        Returns:
173            CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers.
174        """
175        parsers_by_header = {}
176        for headers, header_values, parser in parsers:
177            for header in headers:
178                parsers_by_header[header] = {header_value: parser for header_value in header_values}
179        return cls(fallback_parser, stream_response, parsers_by_header)
180
181    def is_stream_response(self) -> bool:
182        return self._stream_response
183
184    def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
185        parser = self._select_parser(response)
186        if self.is_stream_response():
187            # urllib mentions that some interfaces don't play nice with auto_close
188            # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content
189            # We have indeed observed some issues with CSV parsing.
190            # Hence, we will manage the closing of the file ourselves until we find a better solution.
191            response.raw.auto_close = False
192            yield from parser.parse(
193                data=response.raw,  # type: ignore[arg-type]
194            )
195            response.raw.close()
196        else:
197            yield from parser.parse(data=io.BytesIO(response.content))
198
199    def _select_parser(self, response: requests.Response) -> Parser:
200        """
201        Selects the appropriate parser based on the response headers.
202
203        This method iterates through the `_parsers_by_header` dictionary to find a matching parser
204        based on the headers in the response. If a matching header and header value are found,
205        the corresponding parser is returned. If no match is found, the default parser is returned.
206
207        Args:
208            response (requests.Response): The HTTP response object containing headers to check.
209
210        Returns:
211            Parser: The parser corresponding to the matched header value, or the default parser if no match is found.
212        """
213        for header, parser_by_header_value in self._parsers_by_header.items():
214            if (
215                header in response.headers
216                and response.headers[header] in parser_by_header_value.keys()
217            ):
218                return parser_by_header_value[response.headers[header]]
219        return self.parser

Decoder strategy to transform a requests.Response into a PARSER_OUTPUT_TYPE passed response.raw to parser(s).

Note: response.raw is not decoded/decompressed by default. Parsers should be instantiated recursively.

Example:

composite_raw_decoder = CompositeRawDecoder( parser=GzipParser( inner_parser=JsonLineParser(encoding="iso-8859-1") ) )

CompositeRawDecoder( parser: airbyte_cdk.sources.declarative.decoders.decoder_parser.Parser, stream_response: bool = True, parsers_by_header: Optional[Dict[str, Dict[str, airbyte_cdk.sources.declarative.decoders.decoder_parser.Parser]]] = None)
143    def __init__(
144        self,
145        parser: Parser,
146        stream_response: bool = True,
147        parsers_by_header: PARSERS_BY_HEADER_TYPE = None,
148    ) -> None:
149        # since we moved from using `dataclass` to `__init__` method,
150        # we need to keep using the `parser` to be able to resolve the depenencies
151        # between the parsers correctly.
152        self.parser = parser
153
154        self._parsers_by_header = parsers_by_header if parsers_by_header else {}
155        self._stream_response = stream_response
parser
@classmethod
def by_headers( cls, parsers: List[Tuple[Set[str], Set[str], airbyte_cdk.sources.declarative.decoders.decoder_parser.Parser]], stream_response: bool, fallback_parser: airbyte_cdk.sources.declarative.decoders.decoder_parser.Parser) -> CompositeRawDecoder:
157    @classmethod
158    def by_headers(
159        cls,
160        parsers: PARSERS_TYPE,
161        stream_response: bool,
162        fallback_parser: Parser,
163    ) -> "CompositeRawDecoder":
164        """
165        Create a CompositeRawDecoder instance based on header values.
166
167        Args:
168            parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser.
169            stream_response (bool): A flag indicating whether the response should be streamed.
170            fallback_parser (Parser): A parser to use if no matching header is found.
171
172        Returns:
173            CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers.
174        """
175        parsers_by_header = {}
176        for headers, header_values, parser in parsers:
177            for header in headers:
178                parsers_by_header[header] = {header_value: parser for header_value in header_values}
179        return cls(fallback_parser, stream_response, parsers_by_header)

Create a CompositeRawDecoder instance based on header values.

Arguments:
  • parsers (PARSERS_TYPE): A list of tuples where each tuple contains headers, header values, and a parser.
  • stream_response (bool): A flag indicating whether the response should be streamed.
  • fallback_parser (Parser): A parser to use if no matching header is found.
Returns:

CompositeRawDecoder: An instance of CompositeRawDecoder configured with the provided parsers.

def is_stream_response(self) -> bool:
181    def is_stream_response(self) -> bool:
182        return self._stream_response

Set to True if you'd like to use stream=True option in http requester

def decode( self, response: requests.models.Response) -> Generator[MutableMapping[str, Any], NoneType, NoneType]:
184    def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
185        parser = self._select_parser(response)
186        if self.is_stream_response():
187            # urllib mentions that some interfaces don't play nice with auto_close
188            # More info here: https://urllib3.readthedocs.io/en/stable/user-guide.html#using-io-wrappers-with-response-content
189            # We have indeed observed some issues with CSV parsing.
190            # Hence, we will manage the closing of the file ourselves until we find a better solution.
191            response.raw.auto_close = False
192            yield from parser.parse(
193                data=response.raw,  # type: ignore[arg-type]
194            )
195            response.raw.close()
196        else:
197            yield from parser.parse(data=io.BytesIO(response.content))

Decodes a requests.Response into a Mapping[str, Any] or an array

Parameters
  • response: the response to decode
Returns

Generator of Mapping describing the response

class JsonDecoder(airbyte_cdk.sources.declarative.decoders.Decoder):
20class JsonDecoder(Decoder):
21    """
22    Decoder strategy that returns the json-encoded content of a response, if any.
23
24    Usually, we would try to instantiate the equivalent `CompositeRawDecoder(parser=JsonParser(), stream_response=False)` but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors.
25    """
26
27    def __init__(self, parameters: Mapping[str, Any]):
28        self._decoder = CompositeRawDecoder(parser=JsonParser(), stream_response=False)
29
30    def is_stream_response(self) -> bool:
31        return self._decoder.is_stream_response()
32
33    def decode(
34        self, response: requests.Response
35    ) -> Generator[MutableMapping[str, Any], None, None]:
36        """
37        Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
38        """
39        has_yielded = False
40        try:
41            for element in self._decoder.decode(response):
42                yield element
43                has_yielded = True
44        except Exception:
45            yield {}
46
47        if not has_yielded:
48            yield {}

Decoder strategy that returns the json-encoded content of a response, if any.

Usually, we would try to instantiate the equivalent CompositeRawDecoder(parser=JsonParser(), stream_response=False) but there were specific historical behaviors related to the JsonDecoder that we didn't know if we could remove like the fallback on {} in case of errors.

JsonDecoder(parameters: Mapping[str, Any])
27    def __init__(self, parameters: Mapping[str, Any]):
28        self._decoder = CompositeRawDecoder(parser=JsonParser(), stream_response=False)
def is_stream_response(self) -> bool:
30    def is_stream_response(self) -> bool:
31        return self._decoder.is_stream_response()

Set to True if you'd like to use stream=True option in http requester

def decode( self, response: requests.models.Response) -> Generator[MutableMapping[str, Any], NoneType, NoneType]:
33    def decode(
34        self, response: requests.Response
35    ) -> Generator[MutableMapping[str, Any], None, None]:
36        """
37        Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.
38        """
39        has_yielded = False
40        try:
41            for element in self._decoder.decode(response):
42                yield element
43                has_yielded = True
44        except Exception:
45            yield {}
46
47        if not has_yielded:
48            yield {}

Given the response is an empty string or an emtpy list, the function will return a generator with an empty mapping.

@dataclass
class JsonParser(airbyte_cdk.sources.declarative.decoders.decoder_parser.Parser):
50@dataclass
51class JsonParser(Parser):
52    encoding: str = "utf-8"
53
54    def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
55        """
56        Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
57        """
58        raw_data = data.read()
59        body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data)
60
61        if body_json is None:
62            raise AirbyteTracedException(
63                message="Response JSON data failed to be parsed. See logs for more information.",
64                internal_message=f"Response JSON data failed to be parsed.",
65                failure_type=FailureType.system_error,
66            )
67
68        if isinstance(body_json, list):
69            yield from body_json
70        else:
71            yield from [body_json]
72
73    def _parse_orjson(self, raw_data: bytes) -> Optional[Any]:
74        try:
75            return orjson.loads(raw_data.decode(self.encoding))
76        except Exception as exc:
77            logger.debug(
78                f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}"
79            )
80            return None
81
82    def _parse_json(self, raw_data: bytes) -> Optional[Any]:
83        try:
84            return json.loads(raw_data.decode(self.encoding))
85        except Exception as exc:
86            logger.error(f"Failed to parse JSON data using json library. {exc}")
87            return None
JsonParser(encoding: str = 'utf-8')
encoding: str = 'utf-8'
def parse( self, data: io.BufferedIOBase) -> Generator[MutableMapping[str, Any], NoneType, NoneType]:
54    def parse(self, data: BufferedIOBase) -> PARSER_OUTPUT_TYPE:
55        """
56        Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.
57        """
58        raw_data = data.read()
59        body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data)
60
61        if body_json is None:
62            raise AirbyteTracedException(
63                message="Response JSON data failed to be parsed. See logs for more information.",
64                internal_message=f"Response JSON data failed to be parsed.",
65                failure_type=FailureType.system_error,
66            )
67
68        if isinstance(body_json, list):
69            yield from body_json
70        else:
71            yield from [body_json]

Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data.

@dataclass
class IterableDecoder(airbyte_cdk.sources.declarative.decoders.Decoder):
51@dataclass
52class IterableDecoder(Decoder):
53    """
54    Decoder strategy that returns the string content of the response, if any.
55    """
56
57    parameters: InitVar[Mapping[str, Any]]
58
59    def is_stream_response(self) -> bool:
60        return True
61
62    def decode(
63        self, response: requests.Response
64    ) -> Generator[MutableMapping[str, Any], None, None]:
65        for line in response.iter_lines():
66            yield {"record": line.decode()}

Decoder strategy that returns the string content of the response, if any.

IterableDecoder(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def is_stream_response(self) -> bool:
59    def is_stream_response(self) -> bool:
60        return True

Set to True if you'd like to use stream=True option in http requester

def decode( self, response: requests.models.Response) -> Generator[MutableMapping[str, Any], NoneType, NoneType]:
62    def decode(
63        self, response: requests.Response
64    ) -> Generator[MutableMapping[str, Any], None, None]:
65        for line in response.iter_lines():
66            yield {"record": line.decode()}

Decodes a requests.Response into a Mapping[str, Any] or an array

Parameters
  • response: the response to decode
Returns

Generator of Mapping describing the response

class NoopDecoder(airbyte_cdk.sources.declarative.decoders.Decoder):
14class NoopDecoder(Decoder):
15    def is_stream_response(self) -> bool:
16        return False
17
18    def decode(  # type: ignore[override]  # Signature doesn't match base class
19        self,
20        response: requests.Response,
21    ) -> Generator[Mapping[str, Any], None, None]:
22        yield from [{}]

Decoder strategy to transform a requests.Response into a Mapping[str, Any]

def is_stream_response(self) -> bool:
15    def is_stream_response(self) -> bool:
16        return False

Set to True if you'd like to use stream=True option in http requester

def decode( self, response: requests.models.Response) -> Generator[Mapping[str, Any], NoneType, NoneType]:
18    def decode(  # type: ignore[override]  # Signature doesn't match base class
19        self,
20        response: requests.Response,
21    ) -> Generator[Mapping[str, Any], None, None]:
22        yield from [{}]

Decodes a requests.Response into a Mapping[str, Any] or an array

Parameters
  • response: the response to decode
Returns

Generator of Mapping describing the response

@dataclass
class PaginationDecoderDecorator(airbyte_cdk.sources.declarative.decoders.Decoder):
17@dataclass
18class PaginationDecoderDecorator(Decoder):
19    """
20    Decoder to wrap other decoders when instantiating a DefaultPaginator in order to bypass decoding if the response is streamed.
21    """
22
23    def __init__(self, decoder: Decoder):
24        self._decoder = decoder
25
26    @property
27    def decoder(self) -> Decoder:
28        return self._decoder
29
30    def is_stream_response(self) -> bool:
31        return self._decoder.is_stream_response()
32
33    def decode(
34        self, response: requests.Response
35    ) -> Generator[MutableMapping[str, Any], None, None]:
36        if self._decoder.is_stream_response():
37            logger.warning("Response is streamed and therefore will not be decoded for pagination.")
38            yield {}
39        else:
40            yield from self._decoder.decode(response)

Decoder to wrap other decoders when instantiating a DefaultPaginator in order to bypass decoding if the response is streamed.

PaginationDecoderDecorator(decoder: Decoder)
23    def __init__(self, decoder: Decoder):
24        self._decoder = decoder
decoder: Decoder
26    @property
27    def decoder(self) -> Decoder:
28        return self._decoder
def is_stream_response(self) -> bool:
30    def is_stream_response(self) -> bool:
31        return self._decoder.is_stream_response()

Set to True if you'd like to use stream=True option in http requester

def decode( self, response: requests.models.Response) -> Generator[MutableMapping[str, Any], NoneType, NoneType]:
33    def decode(
34        self, response: requests.Response
35    ) -> Generator[MutableMapping[str, Any], None, None]:
36        if self._decoder.is_stream_response():
37            logger.warning("Response is streamed and therefore will not be decoded for pagination.")
38            yield {}
39        else:
40            yield from self._decoder.decode(response)

Decodes a requests.Response into a Mapping[str, Any] or an array

Parameters
  • response: the response to decode
Returns

Generator of Mapping describing the response

@dataclass
class XmlDecoder(airbyte_cdk.sources.declarative.decoders.Decoder):
19@dataclass
20class XmlDecoder(Decoder):
21    """
22    XmlDecoder is a decoder strategy that parses the XML content of the resopnse, and converts it to a dict.
23
24    This class handles XML attributes by prefixing them with an '@' symbol and represents XML text content by using the '#text' key if the element has attributes or the element name/tag. It does not currently support XML namespace declarations.
25
26    Example XML Input:
27    <root>
28        <location id="123">
29            San Francisco
30        </location>
31        <item id="1" category="books">
32            <name>Book Title 1</name>
33            <price>10.99</price>
34        </item>
35        <item id="2" category="electronics">
36            <name>Gadget</name>
37            <price>299.99</price>
38            <description>A useful gadget</description>
39        </item>
40    </root>
41
42    Converted Output:
43    {
44        "root": {
45            "location: {
46                "@id": "123,
47                "#text": "San Francisco"
48            },
49            "item": [
50              {
51                "@id": "1",
52                "@category": "books",
53                "name": "Book Title 1",
54                "price": "10.99"
55              },
56              {
57                "@id": "2",
58                "@category": "electronics",
59                "name": "Gadget",
60                "price": "299.99",
61                "description": "A useful gadget"
62              }
63            ]
64        }
65    }
66
67    Notes:
68        - Attributes of an XML element are prefixed with an '@' symbol in the dictionary output.
69        - Text content of an XML element is handled in two different ways, depending on whether
70          the element has attributes.
71                - If the element has attributes, the text content will be
72                  represented by the "#text" key.
73                - If the element does not have any attributes, the text content will be
74                  represented by element name.
75        - Namespace declarations are not supported in the current implementation.
76    """
77
78    parameters: InitVar[Mapping[str, Any]]
79
80    def is_stream_response(self) -> bool:
81        return False
82
83    def decode(
84        self, response: requests.Response
85    ) -> Generator[MutableMapping[str, Any], None, None]:
86        body_xml = response.text
87        try:
88            body_json = xmltodict.parse(body_xml)
89            if not isinstance(body_json, list):
90                body_json = [body_json]
91            if len(body_json) == 0:
92                yield {}
93            else:
94                yield from body_json
95        except ExpatError as exc:
96            logger.warning(
97                f"Response cannot be parsed from XML: {response.status_code=}, {response.text=}, {exc=}"
98            )
99            yield {}

XmlDecoder is a decoder strategy that parses the XML content of the resopnse, and converts it to a dict.

This class handles XML attributes by prefixing them with an '@' symbol and represents XML text content by using the '#text' key if the element has attributes or the element name/tag. It does not currently support XML namespace declarations.

Example XML Input: San Francisco Book Title 1 10.99 Gadget 299.99 A useful gadget

Converted Output: { "root": { "location: { "@id": "123, "#text": "San Francisco" }, "item": [ { "@id": "1", "@category": "books", "name": "Book Title 1", "price": "10.99" }, { "@id": "2", "@category": "electronics", "name": "Gadget", "price": "299.99", "description": "A useful gadget" } ] } }

Notes:
  • Attributes of an XML element are prefixed with an '@' symbol in the dictionary output.
  • Text content of an XML element is handled in two different ways, depending on whether the element has attributes. - If the element has attributes, the text content will be represented by the "#text" key. - If the element does not have any attributes, the text content will be represented by element name.
  • Namespace declarations are not supported in the current implementation.
XmlDecoder(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def is_stream_response(self) -> bool:
80    def is_stream_response(self) -> bool:
81        return False

Set to True if you'd like to use stream=True option in http requester

def decode( self, response: requests.models.Response) -> Generator[MutableMapping[str, Any], NoneType, NoneType]:
83    def decode(
84        self, response: requests.Response
85    ) -> Generator[MutableMapping[str, Any], None, None]:
86        body_xml = response.text
87        try:
88            body_json = xmltodict.parse(body_xml)
89            if not isinstance(body_json, list):
90                body_json = [body_json]
91            if len(body_json) == 0:
92                yield {}
93            else:
94                yield from body_json
95        except ExpatError as exc:
96            logger.warning(
97                f"Response cannot be parsed from XML: {response.status_code=}, {response.text=}, {exc=}"
98            )
99            yield {}

Decodes a requests.Response into a Mapping[str, Any] or an array

Parameters
  • response: the response to decode
Returns

Generator of Mapping describing the response

@dataclass
class ZipfileDecoder(airbyte_cdk.sources.declarative.decoders.Decoder):
22@dataclass
23class ZipfileDecoder(Decoder):
24    parser: Parser
25
26    def is_stream_response(self) -> bool:
27        return False
28
29    def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
30        try:
31            with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
32                for file_name in zip_file.namelist():
33                    unzipped_content = zip_file.read(file_name)
34                    buffered_content = BytesIO(unzipped_content)
35                    try:
36                        yield from self.parser.parse(
37                            buffered_content,
38                        )
39                    except Exception as e:
40                        logger.error(
41                            f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}."
42                        )
43                        raise AirbyteTracedException(
44                            message=f"Failed to parse file: {file_name} from zip file.",
45                            internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.",
46                            failure_type=FailureType.system_error,
47                        ) from e
48        except zipfile.BadZipFile as e:
49            logger.error(
50                f"Received an invalid zip file in response to URL: {response.request.url}. "
51                f"The size of the response body is: {len(response.content)}"
52            )
53            raise AirbyteTracedException(
54                message="Received an invalid zip file in response.",
55                internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.",
56                failure_type=FailureType.system_error,
57            ) from e
def is_stream_response(self) -> bool:
26    def is_stream_response(self) -> bool:
27        return False

Set to True if you'd like to use stream=True option in http requester

def decode( self, response: requests.models.Response) -> Generator[MutableMapping[str, Any], NoneType, NoneType]:
29    def decode(self, response: requests.Response) -> DECODER_OUTPUT_TYPE:
30        try:
31            with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
32                for file_name in zip_file.namelist():
33                    unzipped_content = zip_file.read(file_name)
34                    buffered_content = BytesIO(unzipped_content)
35                    try:
36                        yield from self.parser.parse(
37                            buffered_content,
38                        )
39                    except Exception as e:
40                        logger.error(
41                            f"Failed to parse file: {file_name} from zip file: {response.request.url} with exception {e}."
42                        )
43                        raise AirbyteTracedException(
44                            message=f"Failed to parse file: {file_name} from zip file.",
45                            internal_message=f"Failed to parse file: {file_name} from zip file: {response.request.url}.",
46                            failure_type=FailureType.system_error,
47                        ) from e
48        except zipfile.BadZipFile as e:
49            logger.error(
50                f"Received an invalid zip file in response to URL: {response.request.url}. "
51                f"The size of the response body is: {len(response.content)}"
52            )
53            raise AirbyteTracedException(
54                message="Received an invalid zip file in response.",
55                internal_message=f"Received an invalid zip file in response to URL: {response.request.url}.",
56                failure_type=FailureType.system_error,
57            ) from e

Decodes a requests.Response into a Mapping[str, Any] or an array

Parameters
  • response: the response to decode
Returns

Generator of Mapping describing the response