airbyte_cdk.sources.declarative.extractors
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor 6from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector 7from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter 8from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector 9from airbyte_cdk.sources.declarative.extractors.response_to_file_extractor import ( 10 ResponseToFileExtractor, 11) 12from airbyte_cdk.sources.declarative.extractors.type_transformer import TypeTransformer 13 14__all__ = [ 15 "TypeTransformer", 16 "HttpSelector", 17 "DpathExtractor", 18 "RecordFilter", 19 "RecordSelector", 20 "ResponseToFileExtractor", 21]
11@dataclass 12class TypeTransformer(ABC): 13 """ 14 Abstract base class for implementing type transformation logic. 15 16 This class provides a blueprint for defining custom transformations 17 on data records based on a provided schema. Implementing classes 18 must override the `transform` method to specify the transformation 19 logic. 20 21 Attributes: 22 None explicitly defined, as this is a dataclass intended to be 23 subclassed. 24 25 Methods: 26 transform(record: Dict[str, Any], schema: Mapping[str, Any]) -> None: 27 Abstract method that must be implemented by subclasses. 28 It performs a transformation on a given data record based 29 on the provided schema. 30 31 Usage: 32 To use this class, create a subclass that implements the 33 `transform` method with the desired transformation logic. 34 """ 35 36 @abstractmethod 37 def transform( 38 self, 39 record: Dict[str, Any], 40 schema: Mapping[str, Any], 41 ) -> None: 42 """ 43 Perform a transformation on a data record based on a given schema. 44 45 Args: 46 record (Dict[str, Any]): The data record to be transformed. 47 schema (Mapping[str, Any]): The schema that dictates how 48 the record should be transformed. 49 50 Returns: 51 None 52 53 Raises: 54 NotImplementedError: If the method is not implemented 55 by a subclass. 56 """
Abstract base class for implementing type transformation logic.
This class provides a blueprint for defining custom transformations
on data records based on a provided schema. Implementing classes
must override the transform
method to specify the transformation
logic.
Attributes:
- None explicitly defined, as this is a dataclass intended to be
- subclassed.
Methods:
transform(record: Dict[str, Any], schema: Mapping[str, Any]) -> None: Abstract method that must be implemented by subclasses. It performs a transformation on a given data record based on the provided schema.
Usage:
To use this class, create a subclass that implements the
transform
method with the desired transformation logic.
36 @abstractmethod 37 def transform( 38 self, 39 record: Dict[str, Any], 40 schema: Mapping[str, Any], 41 ) -> None: 42 """ 43 Perform a transformation on a data record based on a given schema. 44 45 Args: 46 record (Dict[str, Any]): The data record to be transformed. 47 schema (Mapping[str, Any]): The schema that dictates how 48 the record should be transformed. 49 50 Returns: 51 None 52 53 Raises: 54 NotImplementedError: If the method is not implemented 55 by a subclass. 56 """
Perform a transformation on a data record based on a given schema.
Arguments:
- record (Dict[str, Any]): The data record to be transformed.
- schema (Mapping[str, Any]): The schema that dictates how the record should be transformed.
Returns:
None
Raises:
- NotImplementedError: If the method is not implemented by a subclass.
14class HttpSelector: 15 """ 16 Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering 17 records based on a heuristic. 18 """ 19 20 @abstractmethod 21 def select_records( 22 self, 23 response: requests.Response, 24 stream_state: StreamState, 25 records_schema: Mapping[str, Any], 26 stream_slice: Optional[StreamSlice] = None, 27 next_page_token: Optional[Mapping[str, Any]] = None, 28 ) -> Iterable[Record]: 29 """ 30 Selects records from the response 31 :param response: The response to select the records from 32 :param stream_state: The stream state 33 :param records_schema: json schema of records to return 34 :param stream_slice: The stream slice 35 :param next_page_token: The paginator token 36 :return: List of Records selected from the response 37 """ 38 pass
Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering records based on a heuristic.
20 @abstractmethod 21 def select_records( 22 self, 23 response: requests.Response, 24 stream_state: StreamState, 25 records_schema: Mapping[str, Any], 26 stream_slice: Optional[StreamSlice] = None, 27 next_page_token: Optional[Mapping[str, Any]] = None, 28 ) -> Iterable[Record]: 29 """ 30 Selects records from the response 31 :param response: The response to select the records from 32 :param stream_state: The stream state 33 :param records_schema: json schema of records to return 34 :param stream_slice: The stream slice 35 :param next_page_token: The paginator token 36 :return: List of Records selected from the response 37 """ 38 pass
Selects records from the response
Parameters
- response: The response to select the records from
- stream_state: The stream state
- records_schema: json schema of records to return
- stream_slice: The stream slice
- next_page_token: The paginator token
Returns
List of Records selected from the response
18@dataclass 19class DpathExtractor(RecordExtractor): 20 """ 21 Record extractor that searches a decoded response over a path defined as an array of fields. 22 23 If the field path points to an array, that array is returned. 24 If the field path points to an object, that object is returned wrapped as an array. 25 If the field path points to an empty object, an empty array is returned. 26 If the field path points to a non-existing path, an empty array is returned. 27 28 Examples of instantiating this transform: 29 ``` 30 extractor: 31 type: DpathExtractor 32 field_path: 33 - "root" 34 - "data" 35 ``` 36 37 ``` 38 extractor: 39 type: DpathExtractor 40 field_path: 41 - "root" 42 - "{{ parameters['field'] }}" 43 ``` 44 45 ``` 46 extractor: 47 type: DpathExtractor 48 field_path: [] 49 ``` 50 51 Attributes: 52 field_path (Union[InterpolatedString, str]): Path to the field that should be extracted 53 config (Config): The user-provided configuration as specified by the source's spec 54 decoder (Decoder): The decoder responsible to transfom the response in a Mapping 55 """ 56 57 field_path: List[Union[InterpolatedString, str]] 58 config: Config 59 parameters: InitVar[Mapping[str, Any]] 60 decoder: Decoder = field(default_factory=lambda: JsonDecoder(parameters={})) 61 62 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 63 self._field_path = [ 64 InterpolatedString.create(path, parameters=parameters) for path in self.field_path 65 ] 66 for path_index in range(len(self.field_path)): 67 if isinstance(self.field_path[path_index], str): 68 self._field_path[path_index] = InterpolatedString.create( 69 self.field_path[path_index], parameters=parameters 70 ) 71 72 def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]: 73 for body in self.decoder.decode(response): 74 if len(self._field_path) == 0: 75 extracted = body 76 else: 77 path = [path.eval(self.config) for path in self._field_path] 78 if "*" in path: 79 extracted = dpath.values(body, path) 80 else: 81 extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure 82 if isinstance(extracted, list): 83 yield from extracted 84 elif extracted: 85 yield extracted 86 else: 87 yield from []
Record extractor that searches a decoded response over a path defined as an array of fields.
If the field path points to an array, that array is returned. If the field path points to an object, that object is returned wrapped as an array. If the field path points to an empty object, an empty array is returned. If the field path points to a non-existing path, an empty array is returned.
Examples of instantiating this transform:
extractor:
type: DpathExtractor
field_path:
- "root"
- "data"
extractor:
type: DpathExtractor
field_path:
- "root"
- "{{ parameters['field'] }}"
extractor:
type: DpathExtractor
field_path: []
Attributes:
- field_path (Union[InterpolatedString, str]): Path to the field that should be extracted
- config (Config): The user-provided configuration as specified by the source's spec
- decoder (Decoder): The decoder responsible to transfom the response in a Mapping
72 def extract_records(self, response: requests.Response) -> Iterable[MutableMapping[Any, Any]]: 73 for body in self.decoder.decode(response): 74 if len(self._field_path) == 0: 75 extracted = body 76 else: 77 path = [path.eval(self.config) for path in self._field_path] 78 if "*" in path: 79 extracted = dpath.values(body, path) 80 else: 81 extracted = dpath.get(body, path, default=[]) # type: ignore # extracted will be a MutableMapping, given input data structure 82 if isinstance(extracted, list): 83 yield from extracted 84 elif extracted: 85 yield extracted 86 else: 87 yield from []
Selects records from the response
Parameters
- response: The response to extract the records from
Returns
List of Records extracted from the response
17@dataclass 18class RecordFilter: 19 """ 20 Filter applied on a list of Records 21 22 config (Config): The user-provided configuration as specified by the source's spec 23 condition (str): The string representing the predicate to filter a record. Records will be removed if evaluated to False 24 """ 25 26 parameters: InitVar[Mapping[str, Any]] 27 config: Config 28 condition: str = "" 29 30 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 31 self._filter_interpolator = InterpolatedBoolean( 32 condition=self.condition, parameters=parameters 33 ) 34 35 def filter_records( 36 self, 37 records: Iterable[Mapping[str, Any]], 38 stream_state: StreamState, 39 stream_slice: Optional[StreamSlice] = None, 40 next_page_token: Optional[Mapping[str, Any]] = None, 41 ) -> Iterable[Mapping[str, Any]]: 42 kwargs = { 43 "stream_state": stream_state, 44 "stream_slice": stream_slice, 45 "next_page_token": next_page_token, 46 "stream_slice.extra_fields": stream_slice.extra_fields if stream_slice else {}, 47 } 48 for record in records: 49 if self._filter_interpolator.eval(self.config, record=record, **kwargs): 50 yield record
Filter applied on a list of Records
config (Config): The user-provided configuration as specified by the source's spec condition (str): The string representing the predicate to filter a record. Records will be removed if evaluated to False
35 def filter_records( 36 self, 37 records: Iterable[Mapping[str, Any]], 38 stream_state: StreamState, 39 stream_slice: Optional[StreamSlice] = None, 40 next_page_token: Optional[Mapping[str, Any]] = None, 41 ) -> Iterable[Mapping[str, Any]]: 42 kwargs = { 43 "stream_state": stream_state, 44 "stream_slice": stream_slice, 45 "next_page_token": next_page_token, 46 "stream_slice.extra_fields": stream_slice.extra_fields if stream_slice else {}, 47 } 48 for record in records: 49 if self._filter_interpolator.eval(self.config, record=record, **kwargs): 50 yield record
24@dataclass 25class RecordSelector(HttpSelector): 26 """ 27 Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering 28 records based on a heuristic. 29 30 Attributes: 31 extractor (RecordExtractor): The record extractor responsible for extracting records from a response 32 schema_normalization (TypeTransformer): The record normalizer responsible for casting record values to stream schema types 33 record_filter (RecordFilter): The record filter responsible for filtering extracted records 34 transformations (List[RecordTransformation]): The transformations to be done on the records 35 """ 36 37 extractor: RecordExtractor 38 config: Config 39 parameters: InitVar[Mapping[str, Any]] 40 schema_normalization: Union[TypeTransformer, DeclarativeTypeTransformer] 41 name: str 42 _name: Union[InterpolatedString, str] = field(init=False, repr=False, default="") 43 record_filter: Optional[RecordFilter] = None 44 transformations: List[RecordTransformation] = field(default_factory=lambda: []) 45 transform_before_filtering: bool = False 46 47 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 48 self._parameters = parameters 49 self._name = ( 50 InterpolatedString(self._name, parameters=parameters) 51 if isinstance(self._name, str) 52 else self._name 53 ) 54 55 @property # type: ignore 56 def name(self) -> str: 57 """ 58 :return: Stream name 59 """ 60 return ( 61 str(self._name.eval(self.config)) 62 if isinstance(self._name, InterpolatedString) 63 else self._name 64 ) 65 66 @name.setter 67 def name(self, value: str) -> None: 68 if not isinstance(value, property): 69 self._name = value 70 71 def select_records( 72 self, 73 response: requests.Response, 74 stream_state: StreamState, 75 records_schema: Mapping[str, Any], 76 stream_slice: Optional[StreamSlice] = None, 77 next_page_token: Optional[Mapping[str, Any]] = None, 78 ) -> Iterable[Record]: 79 """ 80 Selects records from the response 81 :param response: The response to select the records from 82 :param stream_state: The stream state 83 :param records_schema: json schema of records to return 84 :param stream_slice: The stream slice 85 :param next_page_token: The paginator token 86 :return: List of Records selected from the response 87 """ 88 all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) 89 yield from self.filter_and_transform( 90 all_data, stream_state, records_schema, stream_slice, next_page_token 91 ) 92 93 def filter_and_transform( 94 self, 95 all_data: Iterable[Mapping[str, Any]], 96 stream_state: StreamState, 97 records_schema: Mapping[str, Any], 98 stream_slice: Optional[StreamSlice] = None, 99 next_page_token: Optional[Mapping[str, Any]] = None, 100 ) -> Iterable[Record]: 101 """ 102 There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and 103 normalization with an API that is technology-specific (as requests.Response is only for HTTP communication using the requests 104 library). 105 106 Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could 107 share the logic of doing transformations on a set of records. 108 """ 109 if self.transform_before_filtering: 110 transformed_data = self._transform(all_data, stream_state, stream_slice) 111 transformed_filtered_data = self._filter( 112 transformed_data, stream_state, stream_slice, next_page_token 113 ) 114 else: 115 filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token) 116 transformed_filtered_data = self._transform(filtered_data, stream_state, stream_slice) 117 normalized_data = self._normalize_by_schema( 118 transformed_filtered_data, schema=records_schema 119 ) 120 for data in normalized_data: 121 yield Record(data=data, stream_name=self.name, associated_slice=stream_slice) 122 123 def _normalize_by_schema( 124 self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]] 125 ) -> Iterable[Mapping[str, Any]]: 126 if schema: 127 # record has type Mapping[str, Any], but dict[str, Any] expected 128 for record in records: 129 normalized_record = dict(record) 130 self.schema_normalization.transform(normalized_record, schema) 131 yield normalized_record 132 else: 133 yield from records 134 135 def _filter( 136 self, 137 records: Iterable[Mapping[str, Any]], 138 stream_state: StreamState, 139 stream_slice: Optional[StreamSlice], 140 next_page_token: Optional[Mapping[str, Any]], 141 ) -> Iterable[Mapping[str, Any]]: 142 if self.record_filter: 143 yield from self.record_filter.filter_records( 144 records, 145 stream_state=stream_state, 146 stream_slice=stream_slice, 147 next_page_token=next_page_token, 148 ) 149 else: 150 yield from records 151 152 def _transform( 153 self, 154 records: Iterable[Mapping[str, Any]], 155 stream_state: StreamState, 156 stream_slice: Optional[StreamSlice] = None, 157 ) -> Iterable[Mapping[str, Any]]: 158 for record in records: 159 for transformation in self.transformations: 160 transformation.transform( 161 record, # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected 162 config=self.config, 163 stream_state=stream_state, 164 stream_slice=stream_slice, 165 ) 166 yield record
Responsible for translating an HTTP response into a list of records by extracting records from the response and optionally filtering records based on a heuristic.
Attributes:
- extractor (RecordExtractor): The record extractor responsible for extracting records from a response
- schema_normalization (TypeTransformer): The record normalizer responsible for casting record values to stream schema types
- record_filter (RecordFilter): The record filter responsible for filtering extracted records
- transformations (List[RecordTransformation]): The transformations to be done on the records
55 @property # type: ignore 56 def name(self) -> str: 57 """ 58 :return: Stream name 59 """ 60 return ( 61 str(self._name.eval(self.config)) 62 if isinstance(self._name, InterpolatedString) 63 else self._name 64 )
Returns
Stream name
71 def select_records( 72 self, 73 response: requests.Response, 74 stream_state: StreamState, 75 records_schema: Mapping[str, Any], 76 stream_slice: Optional[StreamSlice] = None, 77 next_page_token: Optional[Mapping[str, Any]] = None, 78 ) -> Iterable[Record]: 79 """ 80 Selects records from the response 81 :param response: The response to select the records from 82 :param stream_state: The stream state 83 :param records_schema: json schema of records to return 84 :param stream_slice: The stream slice 85 :param next_page_token: The paginator token 86 :return: List of Records selected from the response 87 """ 88 all_data: Iterable[Mapping[str, Any]] = self.extractor.extract_records(response) 89 yield from self.filter_and_transform( 90 all_data, stream_state, records_schema, stream_slice, next_page_token 91 )
Selects records from the response
Parameters
- response: The response to select the records from
- stream_state: The stream state
- records_schema: json schema of records to return
- stream_slice: The stream slice
- next_page_token: The paginator token
Returns
List of Records selected from the response
93 def filter_and_transform( 94 self, 95 all_data: Iterable[Mapping[str, Any]], 96 stream_state: StreamState, 97 records_schema: Mapping[str, Any], 98 stream_slice: Optional[StreamSlice] = None, 99 next_page_token: Optional[Mapping[str, Any]] = None, 100 ) -> Iterable[Record]: 101 """ 102 There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and 103 normalization with an API that is technology-specific (as requests.Response is only for HTTP communication using the requests 104 library). 105 106 Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could 107 share the logic of doing transformations on a set of records. 108 """ 109 if self.transform_before_filtering: 110 transformed_data = self._transform(all_data, stream_state, stream_slice) 111 transformed_filtered_data = self._filter( 112 transformed_data, stream_state, stream_slice, next_page_token 113 ) 114 else: 115 filtered_data = self._filter(all_data, stream_state, stream_slice, next_page_token) 116 transformed_filtered_data = self._transform(filtered_data, stream_state, stream_slice) 117 normalized_data = self._normalize_by_schema( 118 transformed_filtered_data, schema=records_schema 119 ) 120 for data in normalized_data: 121 yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
There is an issue with the selector as of 2024-08-30: it does technology-agnostic processing like filtering, transformation and normalization with an API that is technology-specific (as requests.Response is only for HTTP communication using the requests library).
Until we decide to move this logic away from the selector, we made this method public so that users like AsyncJobRetriever could share the logic of doing transformations on a set of records.
23@dataclass 24class ResponseToFileExtractor(RecordExtractor): 25 """ 26 This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as 27 a tradeoff. 28 29 Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for 30 a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing. 31 """ 32 33 parameters: InitVar[Mapping[str, Any]] 34 35 def __post_init__(self, parameters: Mapping[str, Any]) -> None: 36 self.logger = logging.getLogger("airbyte") 37 38 def _get_response_encoding(self, headers: Dict[str, Any]) -> str: 39 """ 40 Get the encoding of the response based on the provided headers. This method is heavily inspired by the requests library 41 implementation. 42 43 Args: 44 headers (Dict[str, Any]): The headers of the response. 45 Returns: 46 str: The encoding of the response. 47 """ 48 49 content_type = headers.get("content-type") 50 51 if not content_type: 52 return DEFAULT_ENCODING 53 54 content_type, params = requests.utils.parse_header_links(content_type) 55 56 if "charset" in params: 57 return params["charset"].strip("'\"") # type: ignore # we assume headers are returned as str 58 59 return DEFAULT_ENCODING 60 61 def _filter_null_bytes(self, b: bytes) -> bytes: 62 """ 63 Filter out null bytes from a bytes object. 64 65 Args: 66 b (bytes): The input bytes object. 67 Returns: 68 bytes: The filtered bytes object with null bytes removed. 69 70 Referenced Issue: 71 https://github.com/airbytehq/airbyte/issues/8300 72 """ 73 74 res = b.replace(b"\x00", b"") 75 if len(res) < len(b): 76 self.logger.warning( 77 "Filter 'null' bytes from string, size reduced %d -> %d chars", len(b), len(res) 78 ) 79 return res 80 81 def _save_to_file(self, response: requests.Response) -> Tuple[str, str]: 82 """ 83 Saves the binary data from the given response to a temporary file and returns the filepath and response encoding. 84 85 Args: 86 response (Optional[requests.Response]): The response object containing the binary data. Defaults to None. 87 88 Returns: 89 Tuple[str, str]: A tuple containing the filepath of the temporary file and the response encoding. 90 91 Raises: 92 ValueError: If the temporary file does not exist after saving the binary data. 93 """ 94 # set filepath for binary data from response 95 decompressor = zlib.decompressobj(zlib.MAX_WBITS | 32) 96 needs_decompression = True # we will assume at first that the response is compressed and change the flag if not 97 98 tmp_file = str(uuid.uuid4()) 99 with closing(response) as response, open(tmp_file, "wb") as data_file: 100 response_encoding = self._get_response_encoding(dict(response.headers or {})) 101 for chunk in response.iter_content(chunk_size=DOWNLOAD_CHUNK_SIZE): 102 try: 103 if needs_decompression: 104 data_file.write(decompressor.decompress(chunk)) 105 needs_decompression = True 106 else: 107 data_file.write(self._filter_null_bytes(chunk)) 108 except zlib.error: 109 data_file.write(self._filter_null_bytes(chunk)) 110 needs_decompression = False 111 112 # check the file exists 113 if os.path.isfile(tmp_file): 114 return tmp_file, response_encoding 115 else: 116 raise ValueError( 117 f"The IO/Error occured while verifying binary data. Tmp file {tmp_file} doesn't exist." 118 ) 119 120 def _read_with_chunks( 121 self, path: str, file_encoding: str, chunk_size: int = 100 122 ) -> Iterable[Mapping[str, Any]]: 123 """ 124 Reads data from a file in chunks and yields each row as a dictionary. 125 126 Args: 127 path (str): The path to the file to be read. 128 file_encoding (str): The encoding of the file. 129 chunk_size (int, optional): The size of each chunk to be read. Defaults to 100. 130 131 Yields: 132 Mapping[str, Any]: A dictionary representing each row of data. 133 134 Raises: 135 ValueError: If an IO/Error occurs while reading the temporary data. 136 """ 137 138 try: 139 with open(path, "r", encoding=file_encoding) as data: 140 chunks = pd.read_csv( 141 data, chunksize=chunk_size, iterator=True, dialect="unix", dtype=object 142 ) 143 for chunk in chunks: 144 chunk = chunk.replace({nan: None}).to_dict(orient="records") 145 for row in chunk: 146 yield row 147 except pd.errors.EmptyDataError as e: 148 self.logger.info(f"Empty data received. {e}") 149 yield from [] 150 except IOError as ioe: 151 raise ValueError(f"The IO/Error occured while reading tmp data. Called: {path}", ioe) 152 finally: 153 # remove binary tmp file, after data is read 154 os.remove(path) 155 156 def extract_records( 157 self, response: Optional[requests.Response] = None 158 ) -> Iterable[Mapping[str, Any]]: 159 """ 160 Extracts records from the given response by: 161 1) Saving the result to a tmp file 162 2) Reading from saved file by chunks to avoid OOM 163 164 Args: 165 response (Optional[requests.Response]): The response object containing the data. Defaults to None. 166 167 Yields: 168 Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. 169 170 Returns: 171 None 172 """ 173 if response: 174 file_path, encoding = self._save_to_file(response) 175 yield from self._read_with_chunks(file_path, encoding) 176 else: 177 yield from []
This class is used when having very big HTTP responses (usually streamed) which would require too much memory so we use disk space as a tradeoff.
Eventually, we want to support multiple file type by re-using the file based CDK parsers if possible. However, the lift is too high for a first iteration so we will only support CSV parsing using pandas as salesforce and sendgrid were doing.
156 def extract_records( 157 self, response: Optional[requests.Response] = None 158 ) -> Iterable[Mapping[str, Any]]: 159 """ 160 Extracts records from the given response by: 161 1) Saving the result to a tmp file 162 2) Reading from saved file by chunks to avoid OOM 163 164 Args: 165 response (Optional[requests.Response]): The response object containing the data. Defaults to None. 166 167 Yields: 168 Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records. 169 170 Returns: 171 None 172 """ 173 if response: 174 file_path, encoding = self._save_to_file(response) 175 yield from self._read_with_chunks(file_path, encoding) 176 else: 177 yield from []
Extracts records from the given response by:
1) Saving the result to a tmp file 2) Reading from saved file by chunks to avoid OOM
Arguments:
- response (Optional[requests.Response]): The response object containing the data. Defaults to None.
Yields:
Iterable[Mapping[str, Any]]: An iterable of mappings representing the extracted records.
Returns:
None