airbyte_cdk.sources.file_based.file_types
1from typing import Any, Mapping, Type 2 3from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat 4from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat 5from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat 6from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat 7from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat 8from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat 9 10from .avro_parser import AvroParser 11from .csv_parser import CsvParser 12from .excel_parser import ExcelParser 13from .file_transfer import FileTransfer 14from .file_type_parser import FileTypeParser 15from .jsonl_parser import JsonlParser 16from .parquet_parser import ParquetParser 17from .unstructured_parser import UnstructuredParser 18 19default_parsers: Mapping[Type[Any], FileTypeParser] = { 20 AvroFormat: AvroParser(), 21 CsvFormat: CsvParser(), 22 ExcelFormat: ExcelParser(), 23 JsonlFormat: JsonlParser(), 24 ParquetFormat: ParquetParser(), 25 UnstructuredFormat: UnstructuredParser(), 26} 27 28__all__ = [ 29 "AvroParser", 30 "CsvParser", 31 "ExcelParser", 32 "JsonlParser", 33 "ParquetParser", 34 "UnstructuredParser", 35 "FileTransfer", 36 "default_parsers", 37]
47class AvroParser(FileTypeParser): 48 ENCODING = None 49 50 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 51 """ 52 AvroParser does not require config checks, implicit pydantic validation is enough. 53 """ 54 return True, None 55 56 async def infer_schema( 57 self, 58 config: FileBasedStreamConfig, 59 file: RemoteFile, 60 stream_reader: AbstractFileBasedStreamReader, 61 logger: logging.Logger, 62 ) -> SchemaType: 63 avro_format = config.format 64 if not isinstance(avro_format, AvroFormat): 65 raise ValueError(f"Expected ParquetFormat, got {avro_format}") 66 67 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 68 avro_reader = fastavro.reader(fp) # type: ignore [arg-type] 69 avro_schema = avro_reader.writer_schema 70 if not avro_schema["type"] == "record": # type: ignore [index, call-overload] 71 unsupported_type = avro_schema["type"] # type: ignore [index, call-overload] 72 raise ValueError( 73 f"Only record based avro files are supported. Found {unsupported_type}" 74 ) 75 json_schema = { 76 field["name"]: AvroParser._convert_avro_type_to_json( # type: ignore [index] 77 avro_format, 78 field["name"], # type: ignore [index] 79 field["type"], # type: ignore [index] 80 ) 81 for field in avro_schema["fields"] # type: ignore [index, call-overload] 82 } 83 return json_schema 84 85 @classmethod 86 def _convert_avro_type_to_json( 87 cls, avro_format: AvroFormat, field_name: str, avro_field: str 88 ) -> Mapping[str, Any]: 89 if isinstance(avro_field, str) and avro_field in AVRO_TYPE_TO_JSON_TYPE: 90 # Legacy behavior to retain backwards compatibility. Long term we should always represent doubles as strings 91 if avro_field == "double" and not avro_format.double_as_string: 92 return {"type": "number"} 93 return {"type": AVRO_TYPE_TO_JSON_TYPE[avro_field]} 94 if isinstance(avro_field, Mapping): 95 if avro_field["type"] == "record": 96 return { 97 "type": "object", 98 "properties": { 99 object_field["name"]: AvroParser._convert_avro_type_to_json( 100 avro_format, object_field["name"], object_field["type"] 101 ) 102 for object_field in avro_field["fields"] 103 }, 104 } 105 elif avro_field["type"] == "array": 106 if "items" not in avro_field: 107 raise ValueError( 108 f"{field_name} array type does not have a required field items" 109 ) 110 return { 111 "type": "array", 112 "items": AvroParser._convert_avro_type_to_json( 113 avro_format, "", avro_field["items"] 114 ), 115 } 116 elif avro_field["type"] == "enum": 117 if "symbols" not in avro_field: 118 raise ValueError( 119 f"{field_name} enum type does not have a required field symbols" 120 ) 121 if "name" not in avro_field: 122 raise ValueError(f"{field_name} enum type does not have a required field name") 123 return {"type": "string", "enum": avro_field["symbols"]} 124 elif avro_field["type"] == "map": 125 if "values" not in avro_field: 126 raise ValueError(f"{field_name} map type does not have a required field values") 127 return { 128 "type": "object", 129 "additionalProperties": AvroParser._convert_avro_type_to_json( 130 avro_format, "", avro_field["values"] 131 ), 132 } 133 elif avro_field["type"] == "fixed" and avro_field.get("logicalType") != "duration": 134 if "size" not in avro_field: 135 raise ValueError(f"{field_name} fixed type does not have a required field size") 136 if not isinstance(avro_field["size"], int): 137 raise ValueError(f"{field_name} fixed type size value is not an integer") 138 return { 139 "type": "string", 140 "pattern": f"^[0-9A-Fa-f]{{{avro_field['size'] * 2}}}$", 141 } 142 elif avro_field.get("logicalType") == "decimal": 143 if "precision" not in avro_field: 144 raise ValueError( 145 f"{field_name} decimal type does not have a required field precision" 146 ) 147 if "scale" not in avro_field: 148 raise ValueError( 149 f"{field_name} decimal type does not have a required field scale" 150 ) 151 max_whole_number_range = avro_field["precision"] - avro_field["scale"] 152 decimal_range = avro_field["scale"] 153 154 # This regex looks like a mess, but it is validation for at least one whole number and optional fractional numbers 155 # For example: ^-?\d{1,5}(?:\.\d{1,3})?$ would accept 12345.123 and 123456.12345 would be rejected 156 return { 157 "type": "string", 158 "pattern": f"^-?\\d{{{1,max_whole_number_range}}}(?:\\.\\d{1,decimal_range})?$", 159 } 160 elif "logicalType" in avro_field: 161 if avro_field["logicalType"] not in AVRO_LOGICAL_TYPE_TO_JSON: 162 raise ValueError( 163 f"{avro_field['logicalType']} is not a valid Avro logical type" 164 ) 165 return AVRO_LOGICAL_TYPE_TO_JSON[avro_field["logicalType"]] 166 else: 167 raise ValueError(f"Unsupported avro type: {avro_field}") 168 else: 169 raise ValueError(f"Unsupported avro type: {avro_field}") 170 171 def parse_records( 172 self, 173 config: FileBasedStreamConfig, 174 file: RemoteFile, 175 stream_reader: AbstractFileBasedStreamReader, 176 logger: logging.Logger, 177 discovered_schema: Optional[Mapping[str, SchemaType]], 178 ) -> Iterable[Dict[str, Any]]: 179 avro_format = config.format or AvroFormat(filetype="avro") 180 if not isinstance(avro_format, AvroFormat): 181 raise ValueError(f"Expected ParquetFormat, got {avro_format}") 182 183 line_no = 0 184 try: 185 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 186 avro_reader = fastavro.reader(fp) # type: ignore [arg-type] 187 schema = avro_reader.writer_schema 188 schema_field_name_to_type = { 189 field["name"]: cast(dict[str, Any], field["type"]) # type: ignore [index] 190 for field in schema["fields"] # type: ignore [index, call-overload] # If schema is not dict, it is not subscriptable by strings 191 } 192 for record in avro_reader: 193 line_no += 1 194 yield { 195 record_field: self._to_output_value( 196 avro_format, 197 schema_field_name_to_type[record_field], # type: ignore [index] # Any not subscriptable 198 record[record_field], # type: ignore [index] # Any not subscriptable 199 ) 200 for record_field, record_value in schema_field_name_to_type.items() 201 } 202 except Exception as exc: 203 raise RecordParseError( 204 FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no 205 ) from exc 206 207 @property 208 def file_read_mode(self) -> FileReadMode: 209 return FileReadMode.READ_BINARY 210 211 @staticmethod 212 def _to_output_value( 213 avro_format: AvroFormat, record_type: Mapping[str, Any], record_value: Any 214 ) -> Any: 215 if isinstance(record_value, bytes): 216 return record_value.decode() 217 elif not isinstance(record_type, Mapping): 218 if record_type == "double" and avro_format.double_as_string: 219 return str(record_value) 220 return record_value 221 if record_type.get("logicalType") in ("decimal", "uuid"): 222 return str(record_value) 223 elif record_type.get("logicalType") == "date": 224 return record_value.isoformat() 225 elif record_type.get("logicalType") == "timestamp-millis": 226 return record_value.isoformat(sep="T", timespec="milliseconds") 227 elif record_type.get("logicalType") == "timestamp-micros": 228 return record_value.isoformat(sep="T", timespec="microseconds") 229 elif record_type.get("logicalType") == "local-timestamp-millis": 230 return record_value.isoformat(sep="T", timespec="milliseconds") 231 elif record_type.get("logicalType") == "local-timestamp-micros": 232 return record_value.isoformat(sep="T", timespec="microseconds") 233 else: 234 return record_value
An abstract class containing methods that must be implemented for each supported file type.
50 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 51 """ 52 AvroParser does not require config checks, implicit pydantic validation is enough. 53 """ 54 return True, None
AvroParser does not require config checks, implicit pydantic validation is enough.
56 async def infer_schema( 57 self, 58 config: FileBasedStreamConfig, 59 file: RemoteFile, 60 stream_reader: AbstractFileBasedStreamReader, 61 logger: logging.Logger, 62 ) -> SchemaType: 63 avro_format = config.format 64 if not isinstance(avro_format, AvroFormat): 65 raise ValueError(f"Expected ParquetFormat, got {avro_format}") 66 67 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 68 avro_reader = fastavro.reader(fp) # type: ignore [arg-type] 69 avro_schema = avro_reader.writer_schema 70 if not avro_schema["type"] == "record": # type: ignore [index, call-overload] 71 unsupported_type = avro_schema["type"] # type: ignore [index, call-overload] 72 raise ValueError( 73 f"Only record based avro files are supported. Found {unsupported_type}" 74 ) 75 json_schema = { 76 field["name"]: AvroParser._convert_avro_type_to_json( # type: ignore [index] 77 avro_format, 78 field["name"], # type: ignore [index] 79 field["type"], # type: ignore [index] 80 ) 81 for field in avro_schema["fields"] # type: ignore [index, call-overload] 82 } 83 return json_schema
Infer the JSON Schema for this file.
171 def parse_records( 172 self, 173 config: FileBasedStreamConfig, 174 file: RemoteFile, 175 stream_reader: AbstractFileBasedStreamReader, 176 logger: logging.Logger, 177 discovered_schema: Optional[Mapping[str, SchemaType]], 178 ) -> Iterable[Dict[str, Any]]: 179 avro_format = config.format or AvroFormat(filetype="avro") 180 if not isinstance(avro_format, AvroFormat): 181 raise ValueError(f"Expected ParquetFormat, got {avro_format}") 182 183 line_no = 0 184 try: 185 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 186 avro_reader = fastavro.reader(fp) # type: ignore [arg-type] 187 schema = avro_reader.writer_schema 188 schema_field_name_to_type = { 189 field["name"]: cast(dict[str, Any], field["type"]) # type: ignore [index] 190 for field in schema["fields"] # type: ignore [index, call-overload] # If schema is not dict, it is not subscriptable by strings 191 } 192 for record in avro_reader: 193 line_no += 1 194 yield { 195 record_field: self._to_output_value( 196 avro_format, 197 schema_field_name_to_type[record_field], # type: ignore [index] # Any not subscriptable 198 record[record_field], # type: ignore [index] # Any not subscriptable 199 ) 200 for record_field, record_value in schema_field_name_to_type.items() 201 } 202 except Exception as exc: 203 raise RecordParseError( 204 FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no 205 ) from exc
Parse and emit each record.
The mode in which the file should be opened for reading.
155class CsvParser(FileTypeParser): 156 _MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE = 1_000_000 157 158 def __init__(self, csv_reader: Optional[_CsvReader] = None, csv_field_max_bytes: int = 2**31): 159 # Increase the maximum length of data that can be parsed in a single CSV field. The default is 128k, which is typically sufficient 160 # but given the use of Airbyte in loading a large variety of data it is best to allow for a larger maximum field size to avoid 161 # skipping data on load. https://stackoverflow.com/questions/15063936/csv-error-field-larger-than-field-limit-131072 162 csv.field_size_limit(csv_field_max_bytes) 163 self._csv_reader = csv_reader if csv_reader else _CsvReader() 164 165 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 166 """ 167 CsvParser does not require config checks, implicit pydantic validation is enough. 168 """ 169 return True, None 170 171 async def infer_schema( 172 self, 173 config: FileBasedStreamConfig, 174 file: RemoteFile, 175 stream_reader: AbstractFileBasedStreamReader, 176 logger: logging.Logger, 177 ) -> SchemaType: 178 input_schema = config.get_input_schema() 179 if input_schema: 180 return input_schema 181 182 # todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual 183 # sources will likely require one. Rather than modify the interface now we can wait until the real use case 184 config_format = _extract_format(config) 185 type_inferrer_by_field: Dict[str, _TypeInferrer] = defaultdict( 186 lambda: _JsonTypeInferrer( 187 config_format.true_values, config_format.false_values, config_format.null_values 188 ) 189 if config_format.inference_type != InferenceType.NONE 190 else _DisabledTypeInferrer() 191 ) 192 data_generator = self._csv_reader.read_data( 193 config, file, stream_reader, logger, self.file_read_mode 194 ) 195 read_bytes = 0 196 for row in data_generator: 197 for header, value in row.items(): 198 type_inferrer_by_field[header].add_value(value) 199 # This is not accurate as a representation of how many bytes were read because csv does some processing on the actual value 200 # before returning. Given we would like to be more accurate, we could wrap the IO file using a decorator 201 read_bytes += len(value) 202 read_bytes += len(row) - 1 # for separators 203 if read_bytes >= self._MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE: 204 break 205 206 if not type_inferrer_by_field: 207 raise AirbyteTracedException( 208 message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. " 209 f"Else, please contact Airbyte.", 210 failure_type=FailureType.config_error, 211 ) 212 schema = { 213 header.strip(): {"type": type_inferred.infer()} 214 for header, type_inferred in type_inferrer_by_field.items() 215 } 216 data_generator.close() 217 return schema 218 219 def parse_records( 220 self, 221 config: FileBasedStreamConfig, 222 file: RemoteFile, 223 stream_reader: AbstractFileBasedStreamReader, 224 logger: logging.Logger, 225 discovered_schema: Optional[Mapping[str, SchemaType]], 226 ) -> Iterable[Dict[str, Any]]: 227 line_no = 0 228 try: 229 config_format = _extract_format(config) 230 if discovered_schema: 231 property_types = { 232 col: prop["type"] for col, prop in discovered_schema["properties"].items() 233 } 234 deduped_property_types = CsvParser._pre_propcess_property_types(property_types) 235 else: 236 deduped_property_types = {} 237 cast_fn = CsvParser._get_cast_function( 238 deduped_property_types, config_format, logger, config.schemaless 239 ) 240 data_generator = self._csv_reader.read_data( 241 config, file, stream_reader, logger, self.file_read_mode 242 ) 243 for row in data_generator: 244 line_no += 1 245 yield CsvParser._to_nullable( 246 cast_fn(row), 247 deduped_property_types, 248 config_format.null_values, 249 config_format.strings_can_be_null, 250 ) 251 except RecordParseError as parse_err: 252 raise RecordParseError( 253 FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no 254 ) from parse_err 255 finally: 256 data_generator.close() 257 258 @property 259 def file_read_mode(self) -> FileReadMode: 260 return FileReadMode.READ 261 262 @staticmethod 263 def _get_cast_function( 264 deduped_property_types: Mapping[str, str], 265 config_format: CsvFormat, 266 logger: logging.Logger, 267 schemaless: bool, 268 ) -> Callable[[Mapping[str, str]], Mapping[str, str]]: 269 # Only cast values if the schema is provided 270 if deduped_property_types and not schemaless: 271 return partial( 272 CsvParser._cast_types, 273 deduped_property_types=deduped_property_types, 274 config_format=config_format, 275 logger=logger, 276 ) 277 else: 278 # If no schema is provided, yield the rows as they are 279 return _no_cast 280 281 @staticmethod 282 def _to_nullable( 283 row: Mapping[str, str], 284 deduped_property_types: Mapping[str, str], 285 null_values: Set[str], 286 strings_can_be_null: bool, 287 ) -> Dict[str, Optional[str]]: 288 nullable = { 289 k: None 290 if CsvParser._value_is_none( 291 v, deduped_property_types.get(k), null_values, strings_can_be_null 292 ) 293 else v 294 for k, v in row.items() 295 } 296 return nullable 297 298 @staticmethod 299 def _value_is_none( 300 value: Any, 301 deduped_property_type: Optional[str], 302 null_values: Set[str], 303 strings_can_be_null: bool, 304 ) -> bool: 305 return value in null_values and (strings_can_be_null or deduped_property_type != "string") 306 307 @staticmethod 308 def _pre_propcess_property_types(property_types: Dict[str, Any]) -> Mapping[str, str]: 309 """ 310 Transform the property types to be non-nullable and remove duplicate types if any. 311 Sample input: 312 { 313 "col1": ["string", "null"], 314 "col2": ["string", "string", "null"], 315 "col3": "integer" 316 } 317 318 Sample output: 319 { 320 "col1": "string", 321 "col2": "string", 322 "col3": "integer", 323 } 324 """ 325 output = {} 326 for prop, prop_type in property_types.items(): 327 if isinstance(prop_type, list): 328 prop_type_distinct = set(prop_type) 329 prop_type_distinct.remove("null") 330 if len(prop_type_distinct) != 1: 331 raise ValueError(f"Could not get non nullable type from {prop_type}") 332 output[prop] = next(iter(prop_type_distinct)) 333 else: 334 output[prop] = prop_type 335 return output 336 337 @staticmethod 338 def _cast_types( 339 row: Dict[str, str], 340 deduped_property_types: Mapping[str, str], 341 config_format: CsvFormat, 342 logger: logging.Logger, 343 ) -> Dict[str, Any]: 344 """ 345 Casts the values in the input 'row' dictionary according to the types defined in the JSON schema. 346 347 Array and object types are only handled if they can be deserialized as JSON. 348 349 If any errors are encountered, the value will be emitted as a string. 350 """ 351 warnings = [] 352 result = {} 353 354 for key, value in row.items(): 355 prop_type = deduped_property_types.get(key) 356 cast_value: Any = value 357 358 if prop_type in TYPE_PYTHON_MAPPING and prop_type is not None: 359 _, python_type = TYPE_PYTHON_MAPPING[prop_type] 360 361 if python_type is None: 362 if value == "": 363 cast_value = None 364 else: 365 warnings.append(_format_warning(key, value, prop_type)) 366 367 elif python_type is bool: 368 try: 369 cast_value = _value_to_bool( 370 value, config_format.true_values, config_format.false_values 371 ) 372 except ValueError: 373 warnings.append(_format_warning(key, value, prop_type)) 374 375 elif python_type is dict: 376 try: 377 # we don't re-use _value_to_object here because we type the column as object as long as there is only one object 378 cast_value = orjson.loads(value) 379 except orjson.JSONDecodeError: 380 warnings.append(_format_warning(key, value, prop_type)) 381 382 elif python_type is list: 383 try: 384 cast_value = _value_to_list(value) 385 except (ValueError, json.JSONDecodeError): 386 warnings.append(_format_warning(key, value, prop_type)) 387 388 elif python_type: 389 try: 390 cast_value = _value_to_python_type(value, python_type) 391 except ValueError: 392 warnings.append(_format_warning(key, value, prop_type)) 393 394 result[key] = cast_value 395 396 if warnings: 397 logger.warning( 398 f"{FileBasedSourceError.ERROR_CASTING_VALUE.value}: {','.join([w for w in warnings])}", 399 ) 400 return result
An abstract class containing methods that must be implemented for each supported file type.
158 def __init__(self, csv_reader: Optional[_CsvReader] = None, csv_field_max_bytes: int = 2**31): 159 # Increase the maximum length of data that can be parsed in a single CSV field. The default is 128k, which is typically sufficient 160 # but given the use of Airbyte in loading a large variety of data it is best to allow for a larger maximum field size to avoid 161 # skipping data on load. https://stackoverflow.com/questions/15063936/csv-error-field-larger-than-field-limit-131072 162 csv.field_size_limit(csv_field_max_bytes) 163 self._csv_reader = csv_reader if csv_reader else _CsvReader()
165 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 166 """ 167 CsvParser does not require config checks, implicit pydantic validation is enough. 168 """ 169 return True, None
CsvParser does not require config checks, implicit pydantic validation is enough.
171 async def infer_schema( 172 self, 173 config: FileBasedStreamConfig, 174 file: RemoteFile, 175 stream_reader: AbstractFileBasedStreamReader, 176 logger: logging.Logger, 177 ) -> SchemaType: 178 input_schema = config.get_input_schema() 179 if input_schema: 180 return input_schema 181 182 # todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual 183 # sources will likely require one. Rather than modify the interface now we can wait until the real use case 184 config_format = _extract_format(config) 185 type_inferrer_by_field: Dict[str, _TypeInferrer] = defaultdict( 186 lambda: _JsonTypeInferrer( 187 config_format.true_values, config_format.false_values, config_format.null_values 188 ) 189 if config_format.inference_type != InferenceType.NONE 190 else _DisabledTypeInferrer() 191 ) 192 data_generator = self._csv_reader.read_data( 193 config, file, stream_reader, logger, self.file_read_mode 194 ) 195 read_bytes = 0 196 for row in data_generator: 197 for header, value in row.items(): 198 type_inferrer_by_field[header].add_value(value) 199 # This is not accurate as a representation of how many bytes were read because csv does some processing on the actual value 200 # before returning. Given we would like to be more accurate, we could wrap the IO file using a decorator 201 read_bytes += len(value) 202 read_bytes += len(row) - 1 # for separators 203 if read_bytes >= self._MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE: 204 break 205 206 if not type_inferrer_by_field: 207 raise AirbyteTracedException( 208 message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. " 209 f"Else, please contact Airbyte.", 210 failure_type=FailureType.config_error, 211 ) 212 schema = { 213 header.strip(): {"type": type_inferred.infer()} 214 for header, type_inferred in type_inferrer_by_field.items() 215 } 216 data_generator.close() 217 return schema
Infer the JSON Schema for this file.
219 def parse_records( 220 self, 221 config: FileBasedStreamConfig, 222 file: RemoteFile, 223 stream_reader: AbstractFileBasedStreamReader, 224 logger: logging.Logger, 225 discovered_schema: Optional[Mapping[str, SchemaType]], 226 ) -> Iterable[Dict[str, Any]]: 227 line_no = 0 228 try: 229 config_format = _extract_format(config) 230 if discovered_schema: 231 property_types = { 232 col: prop["type"] for col, prop in discovered_schema["properties"].items() 233 } 234 deduped_property_types = CsvParser._pre_propcess_property_types(property_types) 235 else: 236 deduped_property_types = {} 237 cast_fn = CsvParser._get_cast_function( 238 deduped_property_types, config_format, logger, config.schemaless 239 ) 240 data_generator = self._csv_reader.read_data( 241 config, file, stream_reader, logger, self.file_read_mode 242 ) 243 for row in data_generator: 244 line_no += 1 245 yield CsvParser._to_nullable( 246 cast_fn(row), 247 deduped_property_types, 248 config_format.null_values, 249 config_format.strings_can_be_null, 250 ) 251 except RecordParseError as parse_err: 252 raise RecordParseError( 253 FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no 254 ) from parse_err 255 finally: 256 data_generator.close()
Parse and emit each record.
The mode in which the file should be opened for reading.
35class ExcelParser(FileTypeParser): 36 ENCODING = None 37 38 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 39 """ 40 ExcelParser does not require config checks, implicit pydantic validation is enough. 41 """ 42 return True, None 43 44 async def infer_schema( 45 self, 46 config: FileBasedStreamConfig, 47 file: RemoteFile, 48 stream_reader: AbstractFileBasedStreamReader, 49 logger: logging.Logger, 50 ) -> SchemaType: 51 """ 52 Infers the schema of the Excel file by examining its contents. 53 54 Args: 55 config (FileBasedStreamConfig): Configuration for the file-based stream. 56 file (RemoteFile): The remote file to be read. 57 stream_reader (AbstractFileBasedStreamReader): Reader to read the file. 58 logger (logging.Logger): Logger for logging information and errors. 59 60 Returns: 61 SchemaType: Inferred schema of the Excel file. 62 """ 63 64 # Validate the format of the config 65 self.validate_format(config.format, logger) 66 67 fields: Dict[str, str] = {} 68 69 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 70 df = self.open_and_parse_file(fp) 71 for column, df_type in df.dtypes.items(): 72 # Choose the broadest data type if the column's data type differs in dataframes 73 prev_frame_column_type = fields.get(column) # type: ignore [call-overload] 74 fields[column] = self.dtype_to_json_type( # type: ignore [index] 75 prev_frame_column_type, 76 df_type, 77 ) 78 79 schema = { 80 field: ( 81 {"type": "string", "format": "date-time"} 82 if fields[field] == "date-time" 83 else {"type": fields[field]} 84 ) 85 for field in fields 86 } 87 return schema 88 89 def parse_records( 90 self, 91 config: FileBasedStreamConfig, 92 file: RemoteFile, 93 stream_reader: AbstractFileBasedStreamReader, 94 logger: logging.Logger, 95 discovered_schema: Optional[Mapping[str, SchemaType]] = None, 96 ) -> Iterable[Dict[str, Any]]: 97 """ 98 Parses records from an Excel file based on the provided configuration. 99 100 Args: 101 config (FileBasedStreamConfig): Configuration for the file-based stream. 102 file (RemoteFile): The remote file to be read. 103 stream_reader (AbstractFileBasedStreamReader): Reader to read the file. 104 logger (logging.Logger): Logger for logging information and errors. 105 discovered_schema (Optional[Mapping[str, SchemaType]]): Discovered schema for validation. 106 107 Yields: 108 Iterable[Dict[str, Any]]: Parsed records from the Excel file. 109 """ 110 111 # Validate the format of the config 112 self.validate_format(config.format, logger) 113 114 try: 115 # Open and parse the file using the stream reader 116 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 117 df = self.open_and_parse_file(fp) 118 # Yield records as dictionaries 119 # DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson 120 # DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior 121 # see PR description: https://github.com/airbytehq/airbyte/pull/44444/ 122 yield from orjson.loads( 123 df.to_json(orient="records", date_format="iso", date_unit="us") 124 ) 125 126 except Exception as exc: 127 # Raise a RecordParseError if any exception occurs during parsing 128 raise RecordParseError( 129 FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri 130 ) from exc 131 132 @property 133 def file_read_mode(self) -> FileReadMode: 134 """ 135 Returns the file read mode for the Excel file. 136 137 Returns: 138 FileReadMode: The file read mode (binary). 139 """ 140 return FileReadMode.READ_BINARY 141 142 @staticmethod 143 def dtype_to_json_type( 144 current_type: Optional[str], 145 dtype: dtype_, # type: ignore [type-arg] 146 ) -> str: 147 """ 148 Convert Pandas DataFrame types to Airbyte Types. 149 150 Args: 151 current_type (Optional[str]): One of the previous types based on earlier dataframes. 152 dtype: Pandas DataFrame type. 153 154 Returns: 155 str: Corresponding Airbyte Type. 156 """ 157 number_types = ("int64", "float64") 158 if current_type == "string": 159 # Previous column values were of the string type, no need to look further. 160 return current_type 161 if dtype is object: 162 return "string" 163 if dtype in number_types and (not current_type or current_type == "number"): 164 return "number" 165 if dtype == "bool" and (not current_type or current_type == "boolean"): 166 return "boolean" 167 if issubdtype(dtype, datetime64): 168 return "date-time" 169 return "string" 170 171 @staticmethod 172 def validate_format(excel_format: BaseModel, logger: logging.Logger) -> None: 173 """ 174 Validates if the given format is of type ExcelFormat. 175 176 Args: 177 excel_format (Any): The format to be validated. 178 179 Raises: 180 ConfigValidationError: If the format is not ExcelFormat. 181 """ 182 if not isinstance(excel_format, ExcelFormat): 183 logger.info(f"Expected ExcelFormat, got {excel_format}") 184 raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) 185 186 @staticmethod 187 def open_and_parse_file(fp: Union[IOBase, str, Path]) -> pd.DataFrame: 188 """ 189 Opens and parses the Excel file. 190 191 Args: 192 fp: File pointer to the Excel file. 193 194 Returns: 195 pd.DataFrame: Parsed data from the Excel file. 196 """ 197 return pd.ExcelFile(fp, engine="calamine").parse() # type: ignore [arg-type, call-overload, no-any-return]
An abstract class containing methods that must be implemented for each supported file type.
38 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 39 """ 40 ExcelParser does not require config checks, implicit pydantic validation is enough. 41 """ 42 return True, None
ExcelParser does not require config checks, implicit pydantic validation is enough.
44 async def infer_schema( 45 self, 46 config: FileBasedStreamConfig, 47 file: RemoteFile, 48 stream_reader: AbstractFileBasedStreamReader, 49 logger: logging.Logger, 50 ) -> SchemaType: 51 """ 52 Infers the schema of the Excel file by examining its contents. 53 54 Args: 55 config (FileBasedStreamConfig): Configuration for the file-based stream. 56 file (RemoteFile): The remote file to be read. 57 stream_reader (AbstractFileBasedStreamReader): Reader to read the file. 58 logger (logging.Logger): Logger for logging information and errors. 59 60 Returns: 61 SchemaType: Inferred schema of the Excel file. 62 """ 63 64 # Validate the format of the config 65 self.validate_format(config.format, logger) 66 67 fields: Dict[str, str] = {} 68 69 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 70 df = self.open_and_parse_file(fp) 71 for column, df_type in df.dtypes.items(): 72 # Choose the broadest data type if the column's data type differs in dataframes 73 prev_frame_column_type = fields.get(column) # type: ignore [call-overload] 74 fields[column] = self.dtype_to_json_type( # type: ignore [index] 75 prev_frame_column_type, 76 df_type, 77 ) 78 79 schema = { 80 field: ( 81 {"type": "string", "format": "date-time"} 82 if fields[field] == "date-time" 83 else {"type": fields[field]} 84 ) 85 for field in fields 86 } 87 return schema
Infers the schema of the Excel file by examining its contents.
Arguments:
- config (FileBasedStreamConfig): Configuration for the file-based stream.
- file (RemoteFile): The remote file to be read.
- stream_reader (AbstractFileBasedStreamReader): Reader to read the file.
- logger (logging.Logger): Logger for logging information and errors.
Returns:
SchemaType: Inferred schema of the Excel file.
89 def parse_records( 90 self, 91 config: FileBasedStreamConfig, 92 file: RemoteFile, 93 stream_reader: AbstractFileBasedStreamReader, 94 logger: logging.Logger, 95 discovered_schema: Optional[Mapping[str, SchemaType]] = None, 96 ) -> Iterable[Dict[str, Any]]: 97 """ 98 Parses records from an Excel file based on the provided configuration. 99 100 Args: 101 config (FileBasedStreamConfig): Configuration for the file-based stream. 102 file (RemoteFile): The remote file to be read. 103 stream_reader (AbstractFileBasedStreamReader): Reader to read the file. 104 logger (logging.Logger): Logger for logging information and errors. 105 discovered_schema (Optional[Mapping[str, SchemaType]]): Discovered schema for validation. 106 107 Yields: 108 Iterable[Dict[str, Any]]: Parsed records from the Excel file. 109 """ 110 111 # Validate the format of the config 112 self.validate_format(config.format, logger) 113 114 try: 115 # Open and parse the file using the stream reader 116 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 117 df = self.open_and_parse_file(fp) 118 # Yield records as dictionaries 119 # DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson 120 # DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior 121 # see PR description: https://github.com/airbytehq/airbyte/pull/44444/ 122 yield from orjson.loads( 123 df.to_json(orient="records", date_format="iso", date_unit="us") 124 ) 125 126 except Exception as exc: 127 # Raise a RecordParseError if any exception occurs during parsing 128 raise RecordParseError( 129 FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri 130 ) from exc
Parses records from an Excel file based on the provided configuration.
Arguments:
- config (FileBasedStreamConfig): Configuration for the file-based stream.
- file (RemoteFile): The remote file to be read.
- stream_reader (AbstractFileBasedStreamReader): Reader to read the file.
- logger (logging.Logger): Logger for logging information and errors.
- discovered_schema (Optional[Mapping[str, SchemaType]]): Discovered schema for validation.
Yields:
Iterable[Dict[str, Any]]: Parsed records from the Excel file.
132 @property 133 def file_read_mode(self) -> FileReadMode: 134 """ 135 Returns the file read mode for the Excel file. 136 137 Returns: 138 FileReadMode: The file read mode (binary). 139 """ 140 return FileReadMode.READ_BINARY
Returns the file read mode for the Excel file.
Returns:
FileReadMode: The file read mode (binary).
142 @staticmethod 143 def dtype_to_json_type( 144 current_type: Optional[str], 145 dtype: dtype_, # type: ignore [type-arg] 146 ) -> str: 147 """ 148 Convert Pandas DataFrame types to Airbyte Types. 149 150 Args: 151 current_type (Optional[str]): One of the previous types based on earlier dataframes. 152 dtype: Pandas DataFrame type. 153 154 Returns: 155 str: Corresponding Airbyte Type. 156 """ 157 number_types = ("int64", "float64") 158 if current_type == "string": 159 # Previous column values were of the string type, no need to look further. 160 return current_type 161 if dtype is object: 162 return "string" 163 if dtype in number_types and (not current_type or current_type == "number"): 164 return "number" 165 if dtype == "bool" and (not current_type or current_type == "boolean"): 166 return "boolean" 167 if issubdtype(dtype, datetime64): 168 return "date-time" 169 return "string"
Convert Pandas DataFrame types to Airbyte Types.
Arguments:
- current_type (Optional[str]): One of the previous types based on earlier dataframes.
- dtype: Pandas DataFrame type.
Returns:
str: Corresponding Airbyte Type.
171 @staticmethod 172 def validate_format(excel_format: BaseModel, logger: logging.Logger) -> None: 173 """ 174 Validates if the given format is of type ExcelFormat. 175 176 Args: 177 excel_format (Any): The format to be validated. 178 179 Raises: 180 ConfigValidationError: If the format is not ExcelFormat. 181 """ 182 if not isinstance(excel_format, ExcelFormat): 183 logger.info(f"Expected ExcelFormat, got {excel_format}") 184 raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)
Validates if the given format is of type ExcelFormat.
Arguments:
- excel_format (Any): The format to be validated.
Raises:
- ConfigValidationError: If the format is not ExcelFormat.
186 @staticmethod 187 def open_and_parse_file(fp: Union[IOBase, str, Path]) -> pd.DataFrame: 188 """ 189 Opens and parses the Excel file. 190 191 Args: 192 fp: File pointer to the Excel file. 193 194 Returns: 195 pd.DataFrame: Parsed data from the Excel file. 196 """ 197 return pd.ExcelFile(fp, engine="calamine").parse() # type: ignore [arg-type, call-overload, no-any-return]
Opens and parses the Excel file.
Arguments:
- fp: File pointer to the Excel file.
Returns:
pd.DataFrame: Parsed data from the Excel file.
27class JsonlParser(FileTypeParser): 28 MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE = 1_000_000 29 ENCODING = "utf8" 30 31 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 32 """ 33 JsonlParser does not require config checks, implicit pydantic validation is enough. 34 """ 35 return True, None 36 37 async def infer_schema( 38 self, 39 config: FileBasedStreamConfig, 40 file: RemoteFile, 41 stream_reader: AbstractFileBasedStreamReader, 42 logger: logging.Logger, 43 ) -> SchemaType: 44 """ 45 Infers the schema for the file by inferring the schema for each line, and merging 46 it with the previously-inferred schema. 47 """ 48 inferred_schema: Mapping[str, Any] = {} 49 50 for entry in self._parse_jsonl_entries(file, stream_reader, logger, read_limit=True): 51 line_schema = self._infer_schema_for_record(entry) 52 inferred_schema = merge_schemas(inferred_schema, line_schema) 53 54 return inferred_schema 55 56 def parse_records( 57 self, 58 config: FileBasedStreamConfig, 59 file: RemoteFile, 60 stream_reader: AbstractFileBasedStreamReader, 61 logger: logging.Logger, 62 discovered_schema: Optional[Mapping[str, SchemaType]], 63 ) -> Iterable[Dict[str, Any]]: 64 """ 65 This code supports parsing json objects over multiple lines even though this does not align with the JSONL format. This is for 66 backward compatibility reasons i.e. the previous source-s3 parser did support this. The drawback is: 67 * performance as the way we support json over multiple lines is very brute forced 68 * given that we don't have `newlines_in_values` config to scope the possible inputs, we might parse the whole file before knowing if 69 the input is improperly formatted or if the json is over multiple lines 70 71 The goal is to run the V4 of source-s3 in production, track the warning log emitted when there are multiline json objects and 72 deprecate this feature if it's not a valid use case. 73 """ 74 yield from self._parse_jsonl_entries(file, stream_reader, logger) 75 76 @classmethod 77 def _infer_schema_for_record(cls, record: Dict[str, Any]) -> Dict[str, Any]: 78 record_schema = {} 79 for key, value in record.items(): 80 if value is None: 81 record_schema[key] = {"type": "null"} 82 else: 83 record_schema[key] = {"type": PYTHON_TYPE_MAPPING[type(value)]} 84 85 return record_schema 86 87 @property 88 def file_read_mode(self) -> FileReadMode: 89 return FileReadMode.READ 90 91 def _parse_jsonl_entries( 92 self, 93 file: RemoteFile, 94 stream_reader: AbstractFileBasedStreamReader, 95 logger: logging.Logger, 96 read_limit: bool = False, 97 ) -> Iterable[Dict[str, Any]]: 98 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 99 read_bytes = 0 100 101 had_json_parsing_error = False 102 has_warned_for_multiline_json_object = False 103 yielded_at_least_once = False 104 105 accumulator = None 106 for line in fp: 107 if not accumulator: 108 accumulator = self._instantiate_accumulator(line) 109 read_bytes += len(line) 110 accumulator += line # type: ignore [operator] # In reality, it's either bytes or string and we add the same type 111 try: 112 record = orjson.loads(accumulator) 113 if had_json_parsing_error and not has_warned_for_multiline_json_object: 114 logger.warning( 115 f"File at {file.uri} is using multiline JSON. Performance could be greatly reduced" 116 ) 117 has_warned_for_multiline_json_object = True 118 119 yield record 120 yielded_at_least_once = True 121 accumulator = self._instantiate_accumulator(line) 122 except orjson.JSONDecodeError: 123 had_json_parsing_error = True 124 125 if ( 126 read_limit 127 and yielded_at_least_once 128 and read_bytes >= self.MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE 129 ): 130 logger.warning( 131 f"Exceeded the maximum number of bytes per file for schema inference ({self.MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE}). " 132 f"Inferring schema from an incomplete set of records." 133 ) 134 break 135 136 if had_json_parsing_error and not yielded_at_least_once: 137 raise RecordParseError( 138 FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line 139 ) 140 141 @staticmethod 142 def _instantiate_accumulator(line: Union[bytes, str]) -> Union[bytes, str]: 143 if isinstance(line, bytes): 144 return bytes("", json.detect_encoding(line)) 145 elif isinstance(line, str): 146 return ""
An abstract class containing methods that must be implemented for each supported file type.
31 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 32 """ 33 JsonlParser does not require config checks, implicit pydantic validation is enough. 34 """ 35 return True, None
JsonlParser does not require config checks, implicit pydantic validation is enough.
37 async def infer_schema( 38 self, 39 config: FileBasedStreamConfig, 40 file: RemoteFile, 41 stream_reader: AbstractFileBasedStreamReader, 42 logger: logging.Logger, 43 ) -> SchemaType: 44 """ 45 Infers the schema for the file by inferring the schema for each line, and merging 46 it with the previously-inferred schema. 47 """ 48 inferred_schema: Mapping[str, Any] = {} 49 50 for entry in self._parse_jsonl_entries(file, stream_reader, logger, read_limit=True): 51 line_schema = self._infer_schema_for_record(entry) 52 inferred_schema = merge_schemas(inferred_schema, line_schema) 53 54 return inferred_schema
Infers the schema for the file by inferring the schema for each line, and merging it with the previously-inferred schema.
56 def parse_records( 57 self, 58 config: FileBasedStreamConfig, 59 file: RemoteFile, 60 stream_reader: AbstractFileBasedStreamReader, 61 logger: logging.Logger, 62 discovered_schema: Optional[Mapping[str, SchemaType]], 63 ) -> Iterable[Dict[str, Any]]: 64 """ 65 This code supports parsing json objects over multiple lines even though this does not align with the JSONL format. This is for 66 backward compatibility reasons i.e. the previous source-s3 parser did support this. The drawback is: 67 * performance as the way we support json over multiple lines is very brute forced 68 * given that we don't have `newlines_in_values` config to scope the possible inputs, we might parse the whole file before knowing if 69 the input is improperly formatted or if the json is over multiple lines 70 71 The goal is to run the V4 of source-s3 in production, track the warning log emitted when there are multiline json objects and 72 deprecate this feature if it's not a valid use case. 73 """ 74 yield from self._parse_jsonl_entries(file, stream_reader, logger)
This code supports parsing json objects over multiple lines even though this does not align with the JSONL format. This is for backward compatibility reasons i.e. the previous source-s3 parser did support this. The drawback is:
- performance as the way we support json over multiple lines is very brute forced
- given that we don't have
newlines_in_values
config to scope the possible inputs, we might parse the whole file before knowing if the input is improperly formatted or if the json is over multiple lines
The goal is to run the V4 of source-s3 in production, track the warning log emitted when there are multiline json objects and deprecate this feature if it's not a valid use case.
The mode in which the file should be opened for reading.
34class ParquetParser(FileTypeParser): 35 ENCODING = None 36 37 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 38 """ 39 ParquetParser does not require config checks, implicit pydantic validation is enough. 40 """ 41 return True, None 42 43 async def infer_schema( 44 self, 45 config: FileBasedStreamConfig, 46 file: RemoteFile, 47 stream_reader: AbstractFileBasedStreamReader, 48 logger: logging.Logger, 49 ) -> SchemaType: 50 parquet_format = config.format 51 if not isinstance(parquet_format, ParquetFormat): 52 raise ValueError(f"Expected ParquetFormat, got {parquet_format}") 53 54 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 55 parquet_file = pq.ParquetFile(fp) 56 parquet_schema = parquet_file.schema_arrow 57 58 # Inferred non-partition schema 59 schema = { 60 field.name: ParquetParser.parquet_type_to_schema_type(field.type, parquet_format) 61 for field in parquet_schema 62 } 63 # Inferred partition schema 64 partition_columns = { 65 partition.split("=")[0]: {"type": "string"} 66 for partition in self._extract_partitions(file.uri) 67 } 68 69 schema.update(partition_columns) 70 return schema 71 72 def parse_records( 73 self, 74 config: FileBasedStreamConfig, 75 file: RemoteFile, 76 stream_reader: AbstractFileBasedStreamReader, 77 logger: logging.Logger, 78 discovered_schema: Optional[Mapping[str, SchemaType]], 79 ) -> Iterable[Dict[str, Any]]: 80 parquet_format = config.format 81 if not isinstance(parquet_format, ParquetFormat): 82 logger.info(f"Expected ParquetFormat, got {parquet_format}") 83 raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) 84 85 line_no = 0 86 try: 87 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 88 reader = pq.ParquetFile(fp) 89 partition_columns = { 90 x.split("=")[0]: x.split("=")[1] for x in self._extract_partitions(file.uri) 91 } 92 for row_group in range(reader.num_row_groups): 93 batch = reader.read_row_group(row_group) 94 for row in range(batch.num_rows): 95 line_no += 1 96 yield { 97 **{ 98 column: ParquetParser._to_output_value( 99 batch.column(column)[row], parquet_format 100 ) 101 for column in batch.column_names 102 }, 103 **partition_columns, 104 } 105 except Exception as exc: 106 raise RecordParseError( 107 FileBasedSourceError.ERROR_PARSING_RECORD, 108 filename=file.uri, 109 lineno=f"{row_group=}, {line_no=}", 110 ) from exc 111 112 @staticmethod 113 def _extract_partitions(filepath: str) -> List[str]: 114 return [unquote(partition) for partition in filepath.split(os.sep) if "=" in partition] 115 116 @property 117 def file_read_mode(self) -> FileReadMode: 118 return FileReadMode.READ_BINARY 119 120 @staticmethod 121 def _to_output_value( 122 parquet_value: Union[Scalar, DictionaryArray], parquet_format: ParquetFormat 123 ) -> Any: 124 """ 125 Convert an entry in a pyarrow table to a value that can be output by the source. 126 """ 127 if isinstance(parquet_value, DictionaryArray): 128 return ParquetParser._dictionary_array_to_python_value(parquet_value) 129 else: 130 return ParquetParser._scalar_to_python_value(parquet_value, parquet_format) 131 132 @staticmethod 133 def _scalar_to_python_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> Any: 134 """ 135 Convert a pyarrow scalar to a value that can be output by the source. 136 """ 137 if parquet_value.as_py() is None: 138 return None 139 140 # Convert date and datetime objects to isoformat strings 141 if ( 142 pa.types.is_time(parquet_value.type) 143 or pa.types.is_timestamp(parquet_value.type) 144 or pa.types.is_date(parquet_value.type) 145 ): 146 return parquet_value.as_py().isoformat() 147 148 # Convert month_day_nano_interval to array 149 if parquet_value.type == pa.month_day_nano_interval(): 150 return json.loads(json.dumps(parquet_value.as_py())) 151 152 # Decode binary strings to utf-8 153 if ParquetParser._is_binary(parquet_value.type): 154 return parquet_value.as_py().decode("utf-8") 155 156 if pa.types.is_decimal(parquet_value.type): 157 if parquet_format.decimal_as_float: 158 return float(parquet_value.as_py()) 159 else: 160 return str(parquet_value.as_py()) 161 162 if pa.types.is_map(parquet_value.type): 163 return {k: v for k, v in parquet_value.as_py()} 164 165 if pa.types.is_null(parquet_value.type): 166 return None 167 168 # Convert duration to seconds, then convert to the appropriate unit 169 if pa.types.is_duration(parquet_value.type): 170 duration = parquet_value.as_py() 171 duration_seconds = duration.total_seconds() 172 if parquet_value.type.unit == "s": 173 return duration_seconds 174 elif parquet_value.type.unit == "ms": 175 return duration_seconds * 1000 176 elif parquet_value.type.unit == "us": 177 return duration_seconds * 1_000_000 178 elif parquet_value.type.unit == "ns": 179 return duration_seconds * 1_000_000_000 + duration.nanoseconds 180 else: 181 raise ValueError(f"Unknown duration unit: {parquet_value.type.unit}") 182 else: 183 return parquet_value.as_py() 184 185 @staticmethod 186 def _dictionary_array_to_python_value(parquet_value: DictionaryArray) -> Dict[str, Any]: 187 """ 188 Convert a pyarrow dictionary array to a value that can be output by the source. 189 190 Dictionaries are stored as two columns: indices and values 191 The indices column is an array of integers that maps to the values column 192 """ 193 194 return { 195 "indices": parquet_value.indices.tolist(), 196 "values": parquet_value.dictionary.tolist(), 197 } 198 199 @staticmethod 200 def parquet_type_to_schema_type( 201 parquet_type: pa.DataType, parquet_format: ParquetFormat 202 ) -> Mapping[str, str]: 203 """ 204 Convert a pyarrow data type to an Airbyte schema type. 205 Parquet data types are defined at https://arrow.apache.org/docs/python/api/datatypes.html 206 """ 207 208 if pa.types.is_timestamp(parquet_type): 209 return {"type": "string", "format": "date-time"} 210 elif pa.types.is_date(parquet_type): 211 return {"type": "string", "format": "date"} 212 elif ParquetParser._is_string(parquet_type, parquet_format): 213 return {"type": "string"} 214 elif pa.types.is_boolean(parquet_type): 215 return {"type": "boolean"} 216 elif ParquetParser._is_integer(parquet_type): 217 return {"type": "integer"} 218 elif ParquetParser._is_float(parquet_type, parquet_format): 219 return {"type": "number"} 220 elif ParquetParser._is_object(parquet_type): 221 return {"type": "object"} 222 elif ParquetParser._is_list(parquet_type): 223 return {"type": "array"} 224 elif pa.types.is_null(parquet_type): 225 return {"type": "null"} 226 else: 227 raise ValueError(f"Unsupported parquet type: {parquet_type}") 228 229 @staticmethod 230 def _is_binary(parquet_type: pa.DataType) -> bool: 231 return bool( 232 pa.types.is_binary(parquet_type) 233 or pa.types.is_large_binary(parquet_type) 234 or pa.types.is_fixed_size_binary(parquet_type) 235 ) 236 237 @staticmethod 238 def _is_integer(parquet_type: pa.DataType) -> bool: 239 return bool(pa.types.is_integer(parquet_type) or pa.types.is_duration(parquet_type)) 240 241 @staticmethod 242 def _is_float(parquet_type: pa.DataType, parquet_format: ParquetFormat) -> bool: 243 if pa.types.is_decimal(parquet_type): 244 return parquet_format.decimal_as_float 245 else: 246 return bool(pa.types.is_floating(parquet_type)) 247 248 @staticmethod 249 def _is_string(parquet_type: pa.DataType, parquet_format: ParquetFormat) -> bool: 250 if pa.types.is_decimal(parquet_type): 251 return not parquet_format.decimal_as_float 252 else: 253 return bool( 254 pa.types.is_time(parquet_type) 255 or pa.types.is_string(parquet_type) 256 or pa.types.is_large_string(parquet_type) 257 or ParquetParser._is_binary( 258 parquet_type 259 ) # Best we can do is return as a string since we do not support binary 260 ) 261 262 @staticmethod 263 def _is_object(parquet_type: pa.DataType) -> bool: 264 return bool( 265 pa.types.is_dictionary(parquet_type) 266 or pa.types.is_struct(parquet_type) 267 or pa.types.is_map(parquet_type) 268 ) 269 270 @staticmethod 271 def _is_list(parquet_type: pa.DataType) -> bool: 272 return bool( 273 pa.types.is_list(parquet_type) 274 or pa.types.is_large_list(parquet_type) 275 or parquet_type == pa.month_day_nano_interval() 276 )
An abstract class containing methods that must be implemented for each supported file type.
37 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 38 """ 39 ParquetParser does not require config checks, implicit pydantic validation is enough. 40 """ 41 return True, None
ParquetParser does not require config checks, implicit pydantic validation is enough.
43 async def infer_schema( 44 self, 45 config: FileBasedStreamConfig, 46 file: RemoteFile, 47 stream_reader: AbstractFileBasedStreamReader, 48 logger: logging.Logger, 49 ) -> SchemaType: 50 parquet_format = config.format 51 if not isinstance(parquet_format, ParquetFormat): 52 raise ValueError(f"Expected ParquetFormat, got {parquet_format}") 53 54 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 55 parquet_file = pq.ParquetFile(fp) 56 parquet_schema = parquet_file.schema_arrow 57 58 # Inferred non-partition schema 59 schema = { 60 field.name: ParquetParser.parquet_type_to_schema_type(field.type, parquet_format) 61 for field in parquet_schema 62 } 63 # Inferred partition schema 64 partition_columns = { 65 partition.split("=")[0]: {"type": "string"} 66 for partition in self._extract_partitions(file.uri) 67 } 68 69 schema.update(partition_columns) 70 return schema
Infer the JSON Schema for this file.
72 def parse_records( 73 self, 74 config: FileBasedStreamConfig, 75 file: RemoteFile, 76 stream_reader: AbstractFileBasedStreamReader, 77 logger: logging.Logger, 78 discovered_schema: Optional[Mapping[str, SchemaType]], 79 ) -> Iterable[Dict[str, Any]]: 80 parquet_format = config.format 81 if not isinstance(parquet_format, ParquetFormat): 82 logger.info(f"Expected ParquetFormat, got {parquet_format}") 83 raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR) 84 85 line_no = 0 86 try: 87 with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp: 88 reader = pq.ParquetFile(fp) 89 partition_columns = { 90 x.split("=")[0]: x.split("=")[1] for x in self._extract_partitions(file.uri) 91 } 92 for row_group in range(reader.num_row_groups): 93 batch = reader.read_row_group(row_group) 94 for row in range(batch.num_rows): 95 line_no += 1 96 yield { 97 **{ 98 column: ParquetParser._to_output_value( 99 batch.column(column)[row], parquet_format 100 ) 101 for column in batch.column_names 102 }, 103 **partition_columns, 104 } 105 except Exception as exc: 106 raise RecordParseError( 107 FileBasedSourceError.ERROR_PARSING_RECORD, 108 filename=file.uri, 109 lineno=f"{row_group=}, {line_no=}", 110 ) from exc
Parse and emit each record.
The mode in which the file should be opened for reading.
199 @staticmethod 200 def parquet_type_to_schema_type( 201 parquet_type: pa.DataType, parquet_format: ParquetFormat 202 ) -> Mapping[str, str]: 203 """ 204 Convert a pyarrow data type to an Airbyte schema type. 205 Parquet data types are defined at https://arrow.apache.org/docs/python/api/datatypes.html 206 """ 207 208 if pa.types.is_timestamp(parquet_type): 209 return {"type": "string", "format": "date-time"} 210 elif pa.types.is_date(parquet_type): 211 return {"type": "string", "format": "date"} 212 elif ParquetParser._is_string(parquet_type, parquet_format): 213 return {"type": "string"} 214 elif pa.types.is_boolean(parquet_type): 215 return {"type": "boolean"} 216 elif ParquetParser._is_integer(parquet_type): 217 return {"type": "integer"} 218 elif ParquetParser._is_float(parquet_type, parquet_format): 219 return {"type": "number"} 220 elif ParquetParser._is_object(parquet_type): 221 return {"type": "object"} 222 elif ParquetParser._is_list(parquet_type): 223 return {"type": "array"} 224 elif pa.types.is_null(parquet_type): 225 return {"type": "null"} 226 else: 227 raise ValueError(f"Unsupported parquet type: {parquet_type}")
Convert a pyarrow data type to an Airbyte schema type. Parquet data types are defined at https://arrow.apache.org/docs/python/api/datatypes.html
112class UnstructuredParser(FileTypeParser): 113 @property 114 def parser_max_n_files_for_schema_inference(self) -> Optional[int]: 115 """ 116 Just check one file as the schema is static 117 """ 118 return 1 119 120 @property 121 def parser_max_n_files_for_parsability(self) -> Optional[int]: 122 """ 123 Do not check any files for parsability because it might be an expensive operation and doesn't give much confidence whether the sync will succeed. 124 """ 125 return 0 126 127 def get_parser_defined_primary_key(self, config: FileBasedStreamConfig) -> Optional[str]: 128 """ 129 Return the document_key field as the primary key. 130 131 his will pre-select the document key column as the primary key when setting up a connection, making it easier for the user to configure normalization in the destination. 132 """ 133 return "document_key" 134 135 async def infer_schema( 136 self, 137 config: FileBasedStreamConfig, 138 file: RemoteFile, 139 stream_reader: AbstractFileBasedStreamReader, 140 logger: logging.Logger, 141 ) -> SchemaType: 142 format = _extract_format(config) 143 with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle: 144 filetype = self._get_filetype(file_handle, file) 145 if filetype not in self._supported_file_types() and not format.skip_unprocessable_files: 146 raise self._create_parse_error( 147 file, 148 self._get_file_type_error_message(filetype), 149 ) 150 151 return { 152 "content": { 153 "type": "string", 154 "description": "Content of the file as markdown. Might be null if the file could not be parsed", 155 }, 156 "document_key": { 157 "type": "string", 158 "description": "Unique identifier of the document, e.g. the file path", 159 }, 160 "_ab_source_file_parse_error": { 161 "type": "string", 162 "description": "Error message if the file could not be parsed even though the file is supported", 163 }, 164 } 165 166 def parse_records( 167 self, 168 config: FileBasedStreamConfig, 169 file: RemoteFile, 170 stream_reader: AbstractFileBasedStreamReader, 171 logger: logging.Logger, 172 discovered_schema: Optional[Mapping[str, SchemaType]], 173 ) -> Iterable[Dict[str, Any]]: 174 format = _extract_format(config) 175 with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle: 176 try: 177 markdown = self._read_file(file_handle, file, format, logger) 178 yield { 179 "content": markdown, 180 "document_key": file.uri, 181 "_ab_source_file_parse_error": None, 182 } 183 except RecordParseError as e: 184 # RecordParseError is raised when the file can't be parsed because of a problem with the file content (either the file is not supported or the file is corrupted) 185 # if the skip_unprocessable_files flag is set, we log a warning and pass the error as part of the document 186 # otherwise, we raise the error to fail the sync 187 if format.skip_unprocessable_files: 188 exception_str = str(e) 189 logger.warn(f"File {file.uri} caused an error during parsing: {exception_str}.") 190 yield { 191 "content": None, 192 "document_key": file.uri, 193 "_ab_source_file_parse_error": exception_str, 194 } 195 logger.warn(f"File {file.uri} cannot be parsed. Skipping it.") 196 else: 197 raise e 198 except Exception as e: 199 exception_str = str(e) 200 logger.error(f"File {file.uri} caused an error during parsing: {exception_str}.") 201 raise e 202 203 def _read_file( 204 self, 205 file_handle: IOBase, 206 remote_file: RemoteFile, 207 format: UnstructuredFormat, 208 logger: logging.Logger, 209 ) -> str: 210 _import_unstructured() 211 if ( 212 (not unstructured_partition_pdf) 213 or (not unstructured_partition_docx) 214 or (not unstructured_partition_pptx) 215 ): 216 # check whether unstructured library is actually available for better error message and to ensure proper typing (can't be None after this point) 217 raise Exception("unstructured library is not available") 218 219 filetype: FileType | None = self._get_filetype(file_handle, remote_file) 220 221 if filetype is None or filetype not in self._supported_file_types(): 222 raise self._create_parse_error( 223 remote_file, 224 self._get_file_type_error_message(filetype), 225 ) 226 if filetype in {FileType.MD, FileType.TXT}: 227 file_content: bytes = file_handle.read() 228 decoded_content: str = optional_decode(file_content) 229 return decoded_content 230 if format.processing.mode == "local": 231 return self._read_file_locally( 232 file_handle, 233 filetype, 234 format.strategy, 235 remote_file, 236 ) 237 elif format.processing.mode == "api": 238 try: 239 result: str = self._read_file_remotely_with_retries( 240 file_handle, 241 format.processing, 242 filetype, 243 format.strategy, 244 remote_file, 245 ) 246 except Exception as e: 247 # If a parser error happens during remotely processing the file, this means the file is corrupted. This case is handled by the parse_records method, so just rethrow. 248 # 249 # For other exceptions, re-throw as config error so the sync is stopped as problems with the external API need to be resolved by the user and are not considered part of the SLA. 250 # Once this parser leaves experimental stage, we should consider making this a system error instead for issues that might be transient. 251 if isinstance(e, RecordParseError): 252 raise e 253 raise AirbyteTracedException.from_exception( 254 e, failure_type=FailureType.config_error 255 ) 256 257 return result 258 259 def _params_to_dict( 260 self, params: Optional[List[APIParameterConfigModel]], strategy: str 261 ) -> Dict[str, Union[str, List[str]]]: 262 result_dict: Dict[str, Union[str, List[str]]] = {"strategy": strategy} 263 if params is None: 264 return result_dict 265 for item in params: 266 key = item.name 267 value = item.value 268 if key in result_dict: 269 existing_value = result_dict[key] 270 # If the key already exists, append the new value to its list 271 if isinstance(existing_value, list): 272 existing_value.append(value) 273 else: 274 result_dict[key] = [existing_value, value] 275 else: 276 # If the key doesn't exist, add it to the dictionary 277 result_dict[key] = value 278 279 return result_dict 280 281 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 282 """ 283 Perform a connection check for the parser config: 284 - Verify that encryption is enabled if the API is hosted on a cloud instance. 285 - Verify that the API can extract text from a file. 286 287 For local processing, we don't need to perform any additional checks, implicit pydantic validation is enough. 288 """ 289 format_config = _extract_format(config) 290 if isinstance(format_config.processing, LocalProcessingConfigModel): 291 if format_config.strategy == "hi_res": 292 return False, "Hi-res strategy is not supported for local processing" 293 return True, None 294 295 if is_cloud_environment() and not format_config.processing.api_url.startswith("https://"): 296 return False, "Base URL must start with https://" 297 298 try: 299 self._read_file_remotely( 300 BytesIO(b"# Airbyte source connection test"), 301 format_config.processing, 302 FileType.MD, 303 "auto", 304 RemoteFile(uri="test", last_modified=datetime.now()), 305 ) 306 except Exception: 307 return False, "".join(traceback.format_exc()) 308 309 return True, None 310 311 @backoff.on_exception( 312 backoff.expo, requests.exceptions.RequestException, max_tries=5, giveup=user_error 313 ) 314 def _read_file_remotely_with_retries( 315 self, 316 file_handle: IOBase, 317 format: APIProcessingConfigModel, 318 filetype: FileType, 319 strategy: str, 320 remote_file: RemoteFile, 321 ) -> str: 322 """ 323 Read a file remotely, retrying up to 5 times if the error is not caused by user error. This is useful for transient network errors or the API server being overloaded temporarily. 324 """ 325 return self._read_file_remotely(file_handle, format, filetype, strategy, remote_file) 326 327 def _read_file_remotely( 328 self, 329 file_handle: IOBase, 330 format: APIProcessingConfigModel, 331 filetype: FileType, 332 strategy: str, 333 remote_file: RemoteFile, 334 ) -> str: 335 headers = {"accept": "application/json", "unstructured-api-key": format.api_key} 336 337 data = self._params_to_dict(format.parameters, strategy) 338 339 file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])} 340 341 response = requests.post( 342 f"{format.api_url}/general/v0/general", headers=headers, data=data, files=file_data 343 ) 344 345 if response.status_code == 422: 346 # 422 means the file couldn't be processed, but the API is working. Treat this as a parsing error (passing an error record to the destination). 347 raise self._create_parse_error(remote_file, response.json()) 348 else: 349 # Other error statuses are raised as requests exceptions (retry everything except user errors) 350 response.raise_for_status() 351 352 json_response = response.json() 353 354 return self._render_markdown(json_response) 355 356 def _read_file_locally( 357 self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile 358 ) -> str: 359 _import_unstructured() 360 if ( 361 (not unstructured_partition_pdf) 362 or (not unstructured_partition_docx) 363 or (not unstructured_partition_pptx) 364 ): 365 # check whether unstructured library is actually available for better error message and to ensure proper typing (can't be None after this point) 366 raise Exception("unstructured library is not available") 367 368 file: Any = file_handle 369 370 # before the parsing logic is entered, the file is read completely to make sure it is in local memory 371 file_handle.seek(0) 372 file_handle.read() 373 file_handle.seek(0) 374 375 try: 376 if filetype == FileType.PDF: 377 # for PDF, read the file into a BytesIO object because some code paths in pdf parsing are doing an instance check on the file object and don't work with file-like objects 378 file_handle.seek(0) 379 with BytesIO(file_handle.read()) as file: 380 file_handle.seek(0) 381 elements = unstructured_partition_pdf(file=file, strategy=strategy) 382 elif filetype == FileType.DOCX: 383 elements = unstructured_partition_docx(file=file) 384 elif filetype == FileType.PPTX: 385 elements = unstructured_partition_pptx(file=file) 386 except Exception as e: 387 raise self._create_parse_error(remote_file, str(e)) 388 389 return self._render_markdown([element.to_dict() for element in elements]) 390 391 def _create_parse_error( 392 self, 393 remote_file: RemoteFile, 394 message: str, 395 ) -> RecordParseError: 396 return RecordParseError( 397 FileBasedSourceError.ERROR_PARSING_RECORD, filename=remote_file.uri, message=message 398 ) 399 400 def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileType]: 401 """ 402 Detect the file type based on the file name and the file content. 403 404 There are three strategies to determine the file type: 405 1. Use the mime type if available (only some sources support it) 406 2. Use the file name if available 407 3. Use the file content 408 """ 409 if remote_file.mime_type and remote_file.mime_type in STR_TO_FILETYPE: 410 return STR_TO_FILETYPE[remote_file.mime_type] 411 412 # set name to none, otherwise unstructured will try to get the modified date from the local file system 413 if hasattr(file, "name"): 414 file.name = None 415 416 # detect_filetype is either using the file name or file content 417 # if possible, try to leverage the file name to detect the file type 418 # if the file name is not available, use the file content 419 file_type: FileType | None = None 420 try: 421 file_type = detect_filetype( 422 filename=remote_file.uri, 423 ) 424 except Exception: 425 # Path doesn't exist locally. Try something else... 426 pass 427 428 if file_type and file_type != FileType.UNK: 429 return file_type 430 431 type_based_on_content = detect_filetype(file=file) 432 file.seek(0) # detect_filetype is reading to read the file content, so we need to reset 433 434 if type_based_on_content and type_based_on_content != FileType.UNK: 435 return type_based_on_content 436 437 extension = "." + remote_file.uri.split(".")[-1].lower() 438 if extension in EXT_TO_FILETYPE: 439 return EXT_TO_FILETYPE[extension] 440 441 return None 442 443 def _supported_file_types(self) -> List[Any]: 444 return [FileType.MD, FileType.PDF, FileType.DOCX, FileType.PPTX, FileType.TXT] 445 446 def _get_file_type_error_message( 447 self, 448 file_type: FileType | None, 449 ) -> str: 450 supported_file_types = ", ".join([str(type) for type in self._supported_file_types()]) 451 return f"File type {file_type or 'None'!s} is not supported. Supported file types are {supported_file_types}" 452 453 def _render_markdown(self, elements: List[Any]) -> str: 454 return "\n\n".join((self._convert_to_markdown(el) for el in elements)) 455 456 def _convert_to_markdown(self, el: Dict[str, Any]) -> str: 457 if dpath.get(el, "type") == "Title": 458 category_depth = dpath.get(el, "metadata/category_depth", default=1) or 1 459 if not isinstance(category_depth, int): 460 category_depth = ( 461 int(category_depth) if isinstance(category_depth, (str, float)) else 1 462 ) 463 heading_str = "#" * category_depth 464 return f"{heading_str} {dpath.get(el, 'text')}" 465 elif dpath.get(el, "type") == "ListItem": 466 return f"- {dpath.get(el, 'text')}" 467 elif dpath.get(el, "type") == "Formula": 468 return f"```\n{dpath.get(el, 'text')}\n```" 469 else: 470 return str(dpath.get(el, "text", default="")) 471 472 @property 473 def file_read_mode(self) -> FileReadMode: 474 return FileReadMode.READ_BINARY
An abstract class containing methods that must be implemented for each supported file type.
113 @property 114 def parser_max_n_files_for_schema_inference(self) -> Optional[int]: 115 """ 116 Just check one file as the schema is static 117 """ 118 return 1
Just check one file as the schema is static
120 @property 121 def parser_max_n_files_for_parsability(self) -> Optional[int]: 122 """ 123 Do not check any files for parsability because it might be an expensive operation and doesn't give much confidence whether the sync will succeed. 124 """ 125 return 0
Do not check any files for parsability because it might be an expensive operation and doesn't give much confidence whether the sync will succeed.
127 def get_parser_defined_primary_key(self, config: FileBasedStreamConfig) -> Optional[str]: 128 """ 129 Return the document_key field as the primary key. 130 131 his will pre-select the document key column as the primary key when setting up a connection, making it easier for the user to configure normalization in the destination. 132 """ 133 return "document_key"
Return the document_key field as the primary key.
his will pre-select the document key column as the primary key when setting up a connection, making it easier for the user to configure normalization in the destination.
135 async def infer_schema( 136 self, 137 config: FileBasedStreamConfig, 138 file: RemoteFile, 139 stream_reader: AbstractFileBasedStreamReader, 140 logger: logging.Logger, 141 ) -> SchemaType: 142 format = _extract_format(config) 143 with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle: 144 filetype = self._get_filetype(file_handle, file) 145 if filetype not in self._supported_file_types() and not format.skip_unprocessable_files: 146 raise self._create_parse_error( 147 file, 148 self._get_file_type_error_message(filetype), 149 ) 150 151 return { 152 "content": { 153 "type": "string", 154 "description": "Content of the file as markdown. Might be null if the file could not be parsed", 155 }, 156 "document_key": { 157 "type": "string", 158 "description": "Unique identifier of the document, e.g. the file path", 159 }, 160 "_ab_source_file_parse_error": { 161 "type": "string", 162 "description": "Error message if the file could not be parsed even though the file is supported", 163 }, 164 }
Infer the JSON Schema for this file.
166 def parse_records( 167 self, 168 config: FileBasedStreamConfig, 169 file: RemoteFile, 170 stream_reader: AbstractFileBasedStreamReader, 171 logger: logging.Logger, 172 discovered_schema: Optional[Mapping[str, SchemaType]], 173 ) -> Iterable[Dict[str, Any]]: 174 format = _extract_format(config) 175 with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle: 176 try: 177 markdown = self._read_file(file_handle, file, format, logger) 178 yield { 179 "content": markdown, 180 "document_key": file.uri, 181 "_ab_source_file_parse_error": None, 182 } 183 except RecordParseError as e: 184 # RecordParseError is raised when the file can't be parsed because of a problem with the file content (either the file is not supported or the file is corrupted) 185 # if the skip_unprocessable_files flag is set, we log a warning and pass the error as part of the document 186 # otherwise, we raise the error to fail the sync 187 if format.skip_unprocessable_files: 188 exception_str = str(e) 189 logger.warn(f"File {file.uri} caused an error during parsing: {exception_str}.") 190 yield { 191 "content": None, 192 "document_key": file.uri, 193 "_ab_source_file_parse_error": exception_str, 194 } 195 logger.warn(f"File {file.uri} cannot be parsed. Skipping it.") 196 else: 197 raise e 198 except Exception as e: 199 exception_str = str(e) 200 logger.error(f"File {file.uri} caused an error during parsing: {exception_str}.") 201 raise e
Parse and emit each record.
281 def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]: 282 """ 283 Perform a connection check for the parser config: 284 - Verify that encryption is enabled if the API is hosted on a cloud instance. 285 - Verify that the API can extract text from a file. 286 287 For local processing, we don't need to perform any additional checks, implicit pydantic validation is enough. 288 """ 289 format_config = _extract_format(config) 290 if isinstance(format_config.processing, LocalProcessingConfigModel): 291 if format_config.strategy == "hi_res": 292 return False, "Hi-res strategy is not supported for local processing" 293 return True, None 294 295 if is_cloud_environment() and not format_config.processing.api_url.startswith("https://"): 296 return False, "Base URL must start with https://" 297 298 try: 299 self._read_file_remotely( 300 BytesIO(b"# Airbyte source connection test"), 301 format_config.processing, 302 FileType.MD, 303 "auto", 304 RemoteFile(uri="test", last_modified=datetime.now()), 305 ) 306 except Exception: 307 return False, "".join(traceback.format_exc()) 308 309 return True, None
Perform a connection check for the parser config:
- Verify that encryption is enabled if the API is hosted on a cloud instance.
- Verify that the API can extract text from a file.
For local processing, we don't need to perform any additional checks, implicit pydantic validation is enough.
The mode in which the file should be opened for reading.
17class FileTransfer: 18 def __init__(self) -> None: 19 self._local_directory = ( 20 AIRBYTE_STAGING_DIRECTORY 21 if os.path.exists(AIRBYTE_STAGING_DIRECTORY) 22 else DEFAULT_LOCAL_DIRECTORY 23 ) 24 25 def get_file( 26 self, 27 config: FileBasedStreamConfig, 28 file: RemoteFile, 29 stream_reader: AbstractFileBasedStreamReader, 30 logger: logging.Logger, 31 ) -> Iterable[Dict[str, Any]]: 32 try: 33 yield stream_reader.get_file( 34 file=file, local_directory=self._local_directory, logger=logger 35 ) 36 except Exception as ex: 37 logger.error("An error has occurred while getting file: %s", str(ex)) 38 raise ex
25 def get_file( 26 self, 27 config: FileBasedStreamConfig, 28 file: RemoteFile, 29 stream_reader: AbstractFileBasedStreamReader, 30 logger: logging.Logger, 31 ) -> Iterable[Dict[str, Any]]: 32 try: 33 yield stream_reader.get_file( 34 file=file, local_directory=self._local_directory, logger=logger 35 ) 36 except Exception as ex: 37 logger.error("An error has occurred while getting file: %s", str(ex)) 38 raise ex