airbyte_cdk.sources.declarative.extractors

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
 6from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector
 7from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
 8from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
 9from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import (
10    ResponseToFileExtractor,
11)
12from airbyte_cdk.sources.declarative.extractors.type_transformer import TypeTransformer
13
14__all__ = [
15    "TypeTransformer",
16    "HttpSelector",
17    "DpathExtractor",
18    "RecordFilter",
19    "RecordSelector",
20    "ResponseToFileExtractor",
21]
@dataclass
class TypeTransformer(abc.ABC):
11@dataclass
12class TypeTransformer(ABC):
13    """
14    Abstract base class for implementing type transformation logic.
15
16    This class provides a blueprint for defining custom transformations
17    on data records based on a provided schema. Implementing classes
18    must override the `transform` method to specify the transformation
19    logic.
20
21    Attributes:
22        None explicitly defined, as this is a dataclass intended to be
23        subclassed.
24
25    Methods:
26        transform(record: Dict[str, Any], schema: Mapping[str, Any]) -> None:
27            Abstract method that must be implemented by subclasses.
28            It performs a transformation on a given data record based
29            on the provided schema.
30
31    Usage:
32        To use this class, create a subclass that implements the
33        `transform` method with the desired transformation logic.
34    """
35
36    @abstractmethod
37    def transform(
38        self,
39        record: Dict[str, Any],
40        schema: Mapping[str, Any],
41    ) -> None:
42        """
43        Perform a transformation on a data record based on a given schema.
44
45        Args:
46            record (Dict[str, Any]): The data record to be transformed.
47            schema (Mapping[str, Any]): The schema that dictates how
48                the record should be transformed.
49
50        Returns:
51            None
52
53        Raises:
54            NotImplementedError: If the method is not implemented
55                by a subclass.
56        """

Abstract base class for implementing type transformation logic.

This class provides a blueprint for defining custom transformations on data records based on a provided schema. Implementing classes must override the transform method to specify the transformation logic.

Attributes:
  • None explicitly defined, as this is a dataclass intended to be
  • subclassed.
Methods:

transform(record: Dict[str, Any], schema: Mapping[str, Any]) -> None: Abstract method that must be implemented by subclasses. It performs a transformation on a given data record based on the provided schema.

Usage:

To use this class, create a subclass that implements the transform method with the desired transformation logic.

@abstractmethod
def transform(self, record: Dict[str, Any], schema: Mapping[str, Any]) -> None:
36    @abstractmethod
37    def transform(
38        self,
39        record: Dict[str, Any],
40        schema: Mapping[str, Any],
41    ) -> None:
42        """
43        Perform a transformation on a data record based on a given schema.
44
45        Args:
46            record (Dict[str, Any]): The data record to be transformed.
47            schema (Mapping[str, Any]): The schema that dictates how
48                the record should be transformed.
49
50        Returns:
51            None
52
53        Raises:
54            NotImplementedError: If the method is not implemented
55                by a subclass.
56        """

Perform a transformation on a data record based on a given schema.

Arguments:
  • record (Dict[str, Any]): The data record to be transformed.
  • schema (Mapping[str, Any]): The schema that dictates how the record should be transformed.
Returns:

None

Raises:
  • NotImplementedError: If the method is not implemented by a subclass.
class HttpSelector:
14class HttpSelector:
15    """
16    Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering
17    records based on a heuristic.
18    """
19
20    @abstractmethod
21    def select_records(
22        self,
23        response: requests.Response,
24        stream_state: StreamState,
25        records_schema: Mapping[str, Any],
26        stream_slice: Optional[StreamSlice] = None,
27        next_page_token: Optional[Mapping[str, Any]] = None,
28    ) -> Iterable[Record]:
29        """
30        Selects records from the response
31        :param response: The response to select the records from
32        :param stream_state: The stream state
33        :param records_schema: json schema of records to return
34        :param stream_slice: The stream slice
35        :param next_page_token: The paginator token
36        :return: List of Records selected from the response
37        """
38        pass

Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering records based on a heuristic.

@abstractmethod
def select_records( self, response: requests.models.Response, stream_state: Mapping[str, Any], records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Iterable[airbyte_cdk.Record]:
20    @abstractmethod
21    def select_records(
22        self,
23        response: requests.Response,
24        stream_state: StreamState,
25        records_schema: Mapping[str, Any],
26        stream_slice: Optional[StreamSlice] = None,
27        next_page_token: Optional[Mapping[str, Any]] = None,
28    ) -> Iterable[Record]:
29        """
30        Selects records from the response
31        :param response: The response to select the records from
32        :param stream_state: The stream state
33        :param records_schema: json schema of records to return
34        :param stream_slice: The stream slice
35        :param next_page_token: The paginator token
36        :return: List of Records selected from the response
37        """
38        pass

Selects records from the response

Parameters
  • response: The response to select the records from
  • stream_state: The stream state
  • records_schema: json schema of records to return
  • stream_slice: The stream slice
  • next_page_token: The paginator token
Returns

List of Records selected from the response

18@dataclass
19class DpathExtractor(RecordExtractor):
20    """
21    Record extractor that searches a decoded response over a path defined as an array of fields.
22
23    If the field path points to an array, that array is returned.
24    If the field path points to an object, that object is returned wrapped as an array.
25    If the field path points to an empty object, an empty array is returned.
26    If the field path points to a non-existing path, an empty array is returned.
27
28    Examples of instantiating this transform:
29    ```
30      extractor:
31        type: DpathExtractor
32        field_path:
33          - "root"
34          - "data"
35    ```
36
37    ```
38      extractor:
39        type: DpathExtractor
40        field_path:
41          - "root"
42          - "{{ parameters['field'] }}"
43    ```
44
45    ```
46      extractor:
47        type: DpathExtractor
48        field_path: []
49    ```
50
51    Attributes:
52        field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
53        config (Config): The user-provided configuration as specified by the source's spec
54        decoder (Decoder): The decoder responsible to transfom the response in a Mapping
55    """
56
57    field_path: List[Union[InterpolatedString, str]]
58    config: Config
59    parameters: InitVar[Mapping[str, Any]]
60    decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={}))
61
62    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
63        self._field_path = [
64            InterpolatedString.create(path, parameters=parameters) for path in self.field_path
65        ]
66        for path_index in range(len(self.field_path)):
67            if isinstance(self.field_path[path_index], str):
68                self._field_path[path_index] = InterpolatedString.create(
69                    self.field_path[path_index], parameters=parameters
70                )
71
72    def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
73        for body in self.decoder.decode(response):
74            if len(self._field_path) == 0:
75                extracted = body
76            else:
77                path = [path.eval(self.config) for path in self._field_path]
78                if "*" in path:
79                    extracted = dpath.values(body, path)
80                else:
81                    extracted = dpath.get(body, path, default=[])  # type: ignore # extracted will be a MutableMapping, given input data structure
82            if isinstance(extracted, list):
83                yield from extracted
84            elif extracted:
85                yield extracted
86            else:
87                yield from []

Record extractor that searches a decoded response over a path defined as an array of fields.

If the field path points to an array, that array is returned. If the field path points to an object, that object is returned wrapped as an array. If the field path points to an empty object, an empty array is returned. If the field path points to a non-existing path, an empty array is returned.

Examples of instantiating this transform:

  extractor:
    type: DpathExtractor
    field_path:
      - "root"
      - "data"
  extractor:
    type: DpathExtractor
    field_path:
      - "root"
      - "{{ parameters['field'] }}"
  extractor:
    type: DpathExtractor
    field_path: []
Attributes:
  • field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
  • config (Config): The user-provided configuration as specified by the source's spec
  • decoder (Decoder): The decoder responsible to transfom the response in a Mapping
DpathExtractor( field_path: List[Union[airbyte_cdk.InterpolatedString, str]], config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], decoder: airbyte_cdk.Decoder = <factory>)
field_path: List[Union[airbyte_cdk.InterpolatedString, str]]
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def extract_records( self, response: requests.models.Response) -> Iterable[MutableMapping[Any, Any]]:
72    def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]:
73        for body in self.decoder.decode(response):
74            if len(self._field_path) == 0:
75                extracted = body
76            else:
77                path = [path.eval(self.config) for path in self._field_path]
78                if "*" in path:
79                    extracted = dpath.values(body, path)
80                else:
81                    extracted = dpath.get(body, path, default=[])  # type: ignore # extracted will be a MutableMapping, given input data structure
82            if isinstance(extracted, list):
83                yield from extracted
84            elif extracted:
85                yield extracted
86            else:
87                yield from []

Selects records from the response

Parameters
  • response: The response to extract the records from
Returns

List of Records extracted from the response

@dataclass
class RecordFilter:
17@dataclass
18class RecordFilter:
19    """
20    Filter applied on a list of Records
21
22    config (Config): The user-provided configuration as specified by the source's spec
23    condition (str): The string representing the predicate to filter a record. Records will be removed if evaluated to False
24    """
25
26    parameters: InitVar[Mapping[str, Any]]
27    config: Config
28    condition: str = ""
29
30    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
31        self._filter_interpolator = InterpolatedBoolean(
32            condition=self.condition, parameters=parameters
33        )
34
35    def filter_records(
36        self,
37        records: Iterable[Mapping[str, Any]],
38        stream_state: StreamState,
39        stream_slice: Optional[StreamSlice] = None,
40        next_page_token: Optional[Mapping[str, Any]] = None,
41    ) -> Iterable[Mapping[str, Any]]:
42        kwargs = {
43            "stream_state": stream_state,
44            "stream_slice": stream_slice,
45            "next_page_token": next_page_token,
46            "stream_slice.extra_fields": stream_slice.extra_fields if stream_slice else {},
47        }
48        for record in records:
49            if self._filter_interpolator.eval(self.config, record=record, **kwargs):
50                yield record

Filter applied on a list of Records

config (Config): The user-provided configuration as specified by the source's spec condition (str): The string representing the predicate to filter a record. Records will be removed if evaluated to False

RecordFilter( parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], config: Mapping[str, Any], condition: str = '')
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
config: Mapping[str, Any]
condition: str = ''
def filter_records( self, records: Iterable[Mapping[str, Any]], stream_state: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Iterable[Mapping[str, Any]]:
35    def filter_records(
36        self,
37        records: Iterable[Mapping[str, Any]],
38        stream_state: StreamState,
39        stream_slice: Optional[StreamSlice] = None,
40        next_page_token: Optional[Mapping[str, Any]] = None,
41    ) -> Iterable[Mapping[str, Any]]:
42        kwargs = {
43            "stream_state": stream_state,
44            "stream_slice": stream_slice,
45            "next_page_token": next_page_token,
46            "stream_slice.extra_fields": stream_slice.extra_fields if stream_slice else {},
47        }
48        for record in records:
49            if self._filter_interpolator.eval(self.config, record=record, **kwargs):
50                yield record
@dataclass
class RecordSelector(airbyte_cdk.sources.declarative.extractors.HttpSelector):
 24@dataclass
 25class RecordSelector(HttpSelector):
 26    """
 27    Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering
 28    records based on a heuristic.
 29
 30    Attributes:
 31        extractor (RecordExtractor): The record extractor responsible for extracting records from a response
 32        schema_normalization (TypeTransformer): The record normalizer responsible for casting record values to stream schema types
 33        record_filter (RecordFilter): The record filter responsible for filtering extracted records
 34        transformations (List[RecordTransformation]): The transformations to be done on the records
 35    """
 36
 37    extractor: RecordExtractor
 38    config: Config
 39    parameters: InitVar[Mapping[str, Any]]
 40    schema_normalization: Union[TypeTransformer, DeclarativeTypeTransformer]
 41    name: str
 42    _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="")
 43    record_filter: Optional[RecordFilter] = None
 44    transformations: List[RecordTransformation] = field(default_factory=lambda: [])
 45    transform_before_filtering: bool = False
 46
 47    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 48        self._parameters = parameters
 49        self._name = (
 50            InterpolatedString(self._name, parameters=parameters)
 51            if isinstance(self._name, str)
 52            else self._name
 53        )
 54
 55    @property  # type: ignore
 56    def name(self) -> str:
 57        """
 58        :return: Stream name
 59        """
 60        return (
 61            str(self._name.eval(self.config))
 62            if isinstance(self._name, InterpolatedString)
 63            else self._name
 64        )
 65
 66    @name.setter
 67    def name(self, value: str) -> None:
 68        if not isinstance(value, property):
 69            self._name = value
 70
 71    def select_records(
 72        self,
 73        response: requests.Response,
 74        stream_state: StreamState,
 75        records_schema: Mapping[str, Any],
 76        stream_slice: Optional[StreamSlice] = None,
 77        next_page_token: Optional[Mapping[str, Any]] = None,
 78    ) -> Iterable[Record]:
 79        """
 80        Selects records from the response
 81        :param response: The response to select the records from
 82        :param stream_state: The stream state
 83        :param records_schema: json schema of records to return
 84        :param stream_slice: The stream slice
 85        :param next_page_token: The paginator token
 86        :return: List of Records selected from the response
 87        """
 88        all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response)
 89        yield from self.filter_and_transform(
 90            all_data, stream_state, records_schema, stream_slice, next_page_token
 91        )
 92
 93    def filter_and_transform(
 94        self,
 95        all_data: Iterable[Mapping[str, Any]],
 96        stream_state: StreamState,
 97        records_schema: Mapping[str, Any],
 98        stream_slice: Optional[StreamSlice] = None,
 99        next_page_token: Optional[Mapping[str, Any]] = None,
100    ) -> Iterable[Record]:
101        """
102        There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and
103        normalization with an API that is technology-specific (as requests.Response is only for HTTP communication using the requests
104        library).
105
106        Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could
107        share the logic of doing transformations on a set of records.
108        """
109        if self.transform_before_filtering:
110            transformed_data = self._transform(all_data, stream_state, stream_slice)
111            transformed_filtered_data = self._filter(
112                transformed_data, stream_state, stream_slice, next_page_token
113            )
114        else:
115            filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
116            transformed_filtered_data = self._transform(filtered_data, stream_state, stream_slice)
117        normalized_data = self._normalize_by_schema(
118            transformed_filtered_data, schema=records_schema
119        )
120        for data in normalized_data:
121            yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
122
123    def _normalize_by_schema(
124        self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]
125    ) -> Iterable[Mapping[str, Any]]:
126        if schema:
127            # record has type Mapping[str, Any], but dict[str, Any] expected
128            for record in records:
129                normalized_record = dict(record)
130                self.schema_normalization.transform(normalized_record, schema)
131                yield normalized_record
132        else:
133            yield from records
134
135    def _filter(
136        self,
137        records: Iterable[Mapping[str, Any]],
138        stream_state: StreamState,
139        stream_slice: Optional[StreamSlice],
140        next_page_token: Optional[Mapping[str, Any]],
141    ) -> Iterable[Mapping[str, Any]]:
142        if self.record_filter:
143            yield from self.record_filter.filter_records(
144                records,
145                stream_state=stream_state,
146                stream_slice=stream_slice,
147                next_page_token=next_page_token,
148            )
149        else:
150            yield from records
151
152    def _transform(
153        self,
154        records: Iterable[Mapping[str, Any]],
155        stream_state: StreamState,
156        stream_slice: Optional[StreamSlice] = None,
157    ) -> Iterable[Mapping[str, Any]]:
158        for record in records:
159            for transformation in self.transformations:
160                transformation.transform(
161                    record,  # type: ignore  # record has type Mapping[str, Any], but Dict[str, Any] expected
162                    config=self.config,
163                    stream_state=stream_state,
164                    stream_slice=stream_slice,
165                )
166            yield record

Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering records based on a heuristic.

Attributes:
  • extractor (RecordExtractor): The record extractor responsible for extracting records from a response
  • schema_normalization (TypeTransformer): The record normalizer responsible for casting record values to stream schema types
  • record_filter (RecordFilter): The record filter responsible for filtering extracted records
  • transformations (List[RecordTransformation]): The transformations to be done on the records
RecordSelector( extractor: airbyte_cdk.RecordExtractor, config: Mapping[str, Any], parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]], schema_normalization: Union[airbyte_cdk.TypeTransformer, TypeTransformer], name: str = <property object>, record_filter: Optional[RecordFilter] = None, transformations: List[airbyte_cdk.RecordTransformation] = <factory>, transform_before_filtering: bool = False)
config: Mapping[str, Any]
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
schema_normalization: Union[airbyte_cdk.TypeTransformer, TypeTransformer]
name: str
55    @property  # type: ignore
56    def name(self) -> str:
57        """
58        :return: Stream name
59        """
60        return (
61            str(self._name.eval(self.config))
62            if isinstance(self._name, InterpolatedString)
63            else self._name
64        )
Returns

Stream name

record_filter: Optional[RecordFilter] = None
transformations: List[airbyte_cdk.RecordTransformation]
transform_before_filtering: bool = False
def select_records( self, response: requests.models.Response, stream_state: Mapping[str, Any], records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Iterable[airbyte_cdk.Record]:
71    def select_records(
72        self,
73        response: requests.Response,
74        stream_state: StreamState,
75        records_schema: Mapping[str, Any],
76        stream_slice: Optional[StreamSlice] = None,
77        next_page_token: Optional[Mapping[str, Any]] = None,
78    ) -> Iterable[Record]:
79        """
80        Selects records from the response
81        :param response: The response to select the records from
82        :param stream_state: The stream state
83        :param records_schema: json schema of records to return
84        :param stream_slice: The stream slice
85        :param next_page_token: The paginator token
86        :return: List of Records selected from the response
87        """
88        all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response)
89        yield from self.filter_and_transform(
90            all_data, stream_state, records_schema, stream_slice, next_page_token
91        )

Selects records from the response

Parameters
  • response: The response to select the records from
  • stream_state: The stream state
  • records_schema: json schema of records to return
  • stream_slice: The stream slice
  • next_page_token: The paginator token
Returns

List of Records selected from the response

def filter_and_transform( self, all_data: Iterable[Mapping[str, Any]], stream_state: Mapping[str, Any], records_schema: Mapping[str, Any], stream_slice: Optional[airbyte_cdk.StreamSlice] = None, next_page_token: Optional[Mapping[str, Any]] = None) -> Iterable[airbyte_cdk.Record]:
 93    def filter_and_transform(
 94        self,
 95        all_data: Iterable[Mapping[str, Any]],
 96        stream_state: StreamState,
 97        records_schema: Mapping[str, Any],
 98        stream_slice: Optional[StreamSlice] = None,
 99        next_page_token: Optional[Mapping[str, Any]] = None,
100    ) -> Iterable[Record]:
101        """
102        There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and
103        normalization with an API that is technology-specific (as requests.Response is only for HTTP communication using the requests
104        library).
105
106        Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could
107        share the logic of doing transformations on a set of records.
108        """
109        if self.transform_before_filtering:
110            transformed_data = self._transform(all_data, stream_state, stream_slice)
111            transformed_filtered_data = self._filter(
112                transformed_data, stream_state, stream_slice, next_page_token
113            )
114        else:
115            filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token)
116            transformed_filtered_data = self._transform(filtered_data, stream_state, stream_slice)
117        normalized_data = self._normalize_by_schema(
118            transformed_filtered_data, schema=records_schema
119        )
120        for data in normalized_data:
121            yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)

There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and normalization with an API that is technology-specific (as requests.Response is only for HTTP communication using the requests library).

Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could share the logic of doing transformations on a set of records.

@dataclass
class ResponseToFileExtractor(airbyte_cdk.sources.declarative.extractors.record_extractor.RecordExtractor):
 23@dataclass
 24class ResponseToFileExtractor(RecordExtractor):
 25    """
 26    This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as
 27    a tradeoff.
 28
 29    Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for
 30    a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing.
 31    """
 32
 33    parameters: InitVar[Mapping[str, Any]]
 34
 35    def __post_init__(self, parameters: Mapping[str, Any]) -> None:
 36        self.logger = logging.getLogger("airbyte")
 37
 38    def _get_response_encoding(self, headers: Dict[str, Any]) -> str:
 39        """
 40        Get the encoding of the response based on the provided headers. This method is heavily inspired by the requests library
 41        implementation.
 42
 43        Args:
 44            headers (Dict[str, Any]): The headers of the response.
 45        Returns:
 46            str: The encoding of the response.
 47        """
 48
 49        content_type = headers.get("content-type")
 50
 51        if not content_type:
 52            return DEFAULT_ENCODING
 53
 54        content_type, params = requests.utils.parse_header_links(content_type)
 55
 56        if "charset" in params:
 57            return params["charset"].strip("'\"")  # type: ignore  # we assume headers are returned as str
 58
 59        return DEFAULT_ENCODING
 60
 61    def _filter_null_bytes(self, b: bytes) -> bytes:
 62        """
 63        Filter out null bytes from a bytes object.
 64
 65        Args:
 66            b (bytes): The input bytes object.
 67        Returns:
 68            bytes: The filtered bytes object with null bytes removed.
 69
 70        Referenced Issue:
 71            https://github.com/airbytehq/airbyte/issues/8300
 72        """
 73
 74        res = b.replace(b"\x00", b"")
 75        if len(res) < len(b):
 76            self.logger.warning(
 77                "Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res)
 78            )
 79        return res
 80
 81    def _save_to_file(self, response: requests.Response) -> Tuple[str, str]:
 82        """
 83        Saves the binary data from the given response to a temporary file and returns the filepath and response encoding.
 84
 85        Args:
 86            response (Optional[requests.Response]): The response object containing the binary data. Defaults to None.
 87
 88        Returns:
 89            Tuple[str, str]: A tuple containing the filepath of the temporary file and the response encoding.
 90
 91        Raises:
 92            ValueError: If the temporary file does not exist after saving the binary data.
 93        """
 94        # set filepath for binary data from response
 95        decompressor = zlib.decompressobj(zlib.MAX_WBITS | 32)
 96        needs_decompression = True  # we will assume at first that the response is compressed and change the flag if not
 97
 98        tmp_file = str(uuid.uuid4())
 99        with closing(response) as response, open(tmp_file, "wb") as data_file:
100            response_encoding = self._get_response_encoding(dict(response.headers or {}))
101            for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE):
102                try:
103                    if needs_decompression:
104                        data_file.write(decompressor.decompress(chunk))
105                        needs_decompression = True
106                    else:
107                        data_file.write(self._filter_null_bytes(chunk))
108                except zlib.error:
109                    data_file.write(self._filter_null_bytes(chunk))
110                    needs_decompression = False
111
112        # check the file exists
113        if os.path.isfile(tmp_file):
114            return tmp_file, response_encoding
115        else:
116            raise ValueError(
117                f"The IO/Error occured while verifying binary data. Tmp file {tmp_file} doesn't exist."
118            )
119
120    def _read_with_chunks(
121        self, path: str, file_encoding: str, chunk_size: int = 100
122    ) -> Iterable[Mapping[str, Any]]:
123        """
124        Reads data from a file in chunks and yields each row as a dictionary.
125
126        Args:
127            path (str): The path to the file to be read.
128            file_encoding (str): The encoding of the file.
129            chunk_size (int, optional): The size of each chunk to be read. Defaults to 100.
130
131        Yields:
132            Mapping[str, Any]: A dictionary representing each row of data.
133
134        Raises:
135            ValueError: If an IO/Error occurs while reading the temporary data.
136        """
137
138        try:
139            with open(path, "r", encoding=file_encoding) as data:
140                chunks = pd.read_csv(
141                    data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object
142                )
143                for chunk in chunks:
144                    chunk = chunk.replace({nan: None}).to_dict(orient="records")
145                    for row in chunk:
146                        yield row
147        except pd.errors.EmptyDataError as e:
148            self.logger.info(f"Empty data received. {e}")
149            yield from []
150        except IOError as ioe:
151            raise ValueError(f"The IO/Error occured while reading tmp data. Called: {path}", ioe)
152        finally:
153            # remove binary tmp file, after data is read
154            os.remove(path)
155
156    def extract_records(
157        self, response: Optional[requests.Response] = None
158    ) -> Iterable[Mapping[str, Any]]:
159        """
160        Extracts records from the given response by:
161            1) Saving the result to a tmp file
162            2) Reading from saved file by chunks to avoid OOM
163
164        Args:
165            response (Optional[requests.Response]): The response object containing the data. Defaults to None.
166
167        Yields:
168            Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records.
169
170        Returns:
171            None
172        """
173        if response:
174            file_path, encoding = self._save_to_file(response)
175            yield from self._read_with_chunks(file_path, encoding)
176        else:
177            yield from []

This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as a tradeoff.

Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing.

ResponseToFileExtractor(parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]])
parameters: dataclasses.InitVar[typing.Mapping[str, typing.Any]]
def extract_records( self, response: Optional[requests.models.Response] = None) -> Iterable[Mapping[str, Any]]:
156    def extract_records(
157        self, response: Optional[requests.Response] = None
158    ) -> Iterable[Mapping[str, Any]]:
159        """
160        Extracts records from the given response by:
161            1) Saving the result to a tmp file
162            2) Reading from saved file by chunks to avoid OOM
163
164        Args:
165            response (Optional[requests.Response]): The response object containing the data. Defaults to None.
166
167        Yields:
168            Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records.
169
170        Returns:
171            None
172        """
173        if response:
174            file_path, encoding = self._save_to_file(response)
175            yield from self._read_with_chunks(file_path, encoding)
176        else:
177            yield from []
Extracts records from the given response by:

1) Saving the result to a tmp file 2) Reading from saved file by chunks to avoid OOM

Arguments:
  • response (Optional[requests.Response]): The response object containing the data. Defaults to None.
Yields:

Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records.

Returns:

None