airbyte_cdk.sources.file_based.file_types

 1from typing import Any, Mapping, Type
 2
 3from airbyte_cdk.sources.file_based.config.avro_format import AvroFormat
 4from airbyte_cdk.sources.file_based.config.csv_format import CsvFormat
 5from airbyte_cdk.sources.file_based.config.excel_format import ExcelFormat
 6from airbyte_cdk.sources.file_based.config.jsonl_format import JsonlFormat
 7from airbyte_cdk.sources.file_based.config.parquet_format import ParquetFormat
 8from airbyte_cdk.sources.file_based.config.unstructured_format import UnstructuredFormat
 9
10from .avro_parser import AvroParser
11from .csv_parser import CsvParser
12from .excel_parser import ExcelParser
13from .file_transfer import FileTransfer
14from .file_type_parser import FileTypeParser
15from .jsonl_parser import JsonlParser
16from .parquet_parser import ParquetParser
17from .unstructured_parser import UnstructuredParser
18
19default_parsers: Mapping[Type[Any], FileTypeParser] = {
20    AvroFormat: AvroParser(),
21    CsvFormat: CsvParser(),
22    ExcelFormat: ExcelParser(),
23    JsonlFormat: JsonlParser(),
24    ParquetFormat: ParquetParser(),
25    UnstructuredFormat: UnstructuredParser(),
26}
27
28__all__ = [
29    "AvroParser",
30    "CsvParser",
31    "ExcelParser",
32    "JsonlParser",
33    "ParquetParser",
34    "UnstructuredParser",
35    "FileTransfer",
36    "default_parsers",
37]
 47class AvroParser(FileTypeParser):
 48    ENCODING = None
 49
 50    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
 51        """
 52        AvroParser does not require config checks, implicit pydantic validation is enough.
 53        """
 54        return True, None
 55
 56    async def infer_schema(
 57        self,
 58        config: FileBasedStreamConfig,
 59        file: RemoteFile,
 60        stream_reader: AbstractFileBasedStreamReader,
 61        logger: logging.Logger,
 62    ) -> SchemaType:
 63        avro_format = config.format
 64        if not isinstance(avro_format, AvroFormat):
 65            raise ValueError(f"Expected ParquetFormat, got {avro_format}")
 66
 67        with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
 68            avro_reader = fastavro.reader(fp)  # type: ignore [arg-type]
 69            avro_schema = avro_reader.writer_schema
 70        if not avro_schema["type"] == "record":  # type: ignore [index, call-overload]
 71            unsupported_type = avro_schema["type"]  # type: ignore [index, call-overload]
 72            raise ValueError(
 73                f"Only record based avro files are supported. Found {unsupported_type}"
 74            )
 75        json_schema = {
 76            field["name"]: AvroParser._convert_avro_type_to_json(  # type: ignore [index]
 77                avro_format,
 78                field["name"],  # type: ignore [index]
 79                field["type"],  # type: ignore [index]
 80            )
 81            for field in avro_schema["fields"]  # type: ignore [index, call-overload]
 82        }
 83        return json_schema
 84
 85    @classmethod
 86    def _convert_avro_type_to_json(
 87        cls, avro_format: AvroFormat, field_name: str, avro_field: str
 88    ) -> Mapping[str, Any]:
 89        if isinstance(avro_field, str) and avro_field in AVRO_TYPE_TO_JSON_TYPE:
 90            # Legacy behavior to retain backwards compatibility. Long term we should always represent doubles as strings
 91            if avro_field == "double" and not avro_format.double_as_string:
 92                return {"type": "number"}
 93            return {"type": AVRO_TYPE_TO_JSON_TYPE[avro_field]}
 94        if isinstance(avro_field, Mapping):
 95            if avro_field["type"] == "record":
 96                return {
 97                    "type": "object",
 98                    "properties": {
 99                        object_field["name"]: AvroParser._convert_avro_type_to_json(
100                            avro_format, object_field["name"], object_field["type"]
101                        )
102                        for object_field in avro_field["fields"]
103                    },
104                }
105            elif avro_field["type"] == "array":
106                if "items" not in avro_field:
107                    raise ValueError(
108                        f"{field_name} array type does not have a required field items"
109                    )
110                return {
111                    "type": "array",
112                    "items": AvroParser._convert_avro_type_to_json(
113                        avro_format, "", avro_field["items"]
114                    ),
115                }
116            elif avro_field["type"] == "enum":
117                if "symbols" not in avro_field:
118                    raise ValueError(
119                        f"{field_name} enum type does not have a required field symbols"
120                    )
121                if "name" not in avro_field:
122                    raise ValueError(f"{field_name} enum type does not have a required field name")
123                return {"type": "string", "enum": avro_field["symbols"]}
124            elif avro_field["type"] == "map":
125                if "values" not in avro_field:
126                    raise ValueError(f"{field_name} map type does not have a required field values")
127                return {
128                    "type": "object",
129                    "additionalProperties": AvroParser._convert_avro_type_to_json(
130                        avro_format, "", avro_field["values"]
131                    ),
132                }
133            elif avro_field["type"] == "fixed" and avro_field.get("logicalType") != "duration":
134                if "size" not in avro_field:
135                    raise ValueError(f"{field_name} fixed type does not have a required field size")
136                if not isinstance(avro_field["size"], int):
137                    raise ValueError(f"{field_name} fixed type size value is not an integer")
138                return {
139                    "type": "string",
140                    "pattern": f"^[0-9A-Fa-f]{{{avro_field['size'] * 2}}}$",
141                }
142            elif avro_field.get("logicalType") == "decimal":
143                if "precision" not in avro_field:
144                    raise ValueError(
145                        f"{field_name} decimal type does not have a required field precision"
146                    )
147                if "scale" not in avro_field:
148                    raise ValueError(
149                        f"{field_name} decimal type does not have a required field scale"
150                    )
151                max_whole_number_range = avro_field["precision"] - avro_field["scale"]
152                decimal_range = avro_field["scale"]
153
154                # This regex looks like a mess, but it is validation for at least one whole number and optional fractional numbers
155                # For example: ^-?\d{1,5}(?:\.\d{1,3})?$ would accept 12345.123 and 123456.12345 would  be rejected
156                return {
157                    "type": "string",
158                    "pattern": f"^-?\\d{{{1,max_whole_number_range}}}(?:\\.\\d{1,decimal_range})?$",
159                }
160            elif "logicalType" in avro_field:
161                if avro_field["logicalType"] not in AVRO_LOGICAL_TYPE_TO_JSON:
162                    raise ValueError(
163                        f"{avro_field['logicalType']} is not a valid Avro logical type"
164                    )
165                return AVRO_LOGICAL_TYPE_TO_JSON[avro_field["logicalType"]]
166            else:
167                raise ValueError(f"Unsupported avro type: {avro_field}")
168        else:
169            raise ValueError(f"Unsupported avro type: {avro_field}")
170
171    def parse_records(
172        self,
173        config: FileBasedStreamConfig,
174        file: RemoteFile,
175        stream_reader: AbstractFileBasedStreamReader,
176        logger: logging.Logger,
177        discovered_schema: Optional[Mapping[str, SchemaType]],
178    ) -> Iterable[Dict[str, Any]]:
179        avro_format = config.format or AvroFormat(filetype="avro")
180        if not isinstance(avro_format, AvroFormat):
181            raise ValueError(f"Expected ParquetFormat, got {avro_format}")
182
183        line_no = 0
184        try:
185            with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
186                avro_reader = fastavro.reader(fp)  # type: ignore [arg-type]
187                schema = avro_reader.writer_schema
188                schema_field_name_to_type = {
189                    field["name"]: cast(dict[str, Any], field["type"])  # type: ignore [index]
190                    for field in schema["fields"]  # type: ignore [index, call-overload]  # If schema is not dict, it is not subscriptable by strings
191                }
192                for record in avro_reader:
193                    line_no += 1
194                    yield {
195                        record_field: self._to_output_value(
196                            avro_format,
197                            schema_field_name_to_type[record_field],  # type: ignore [index] # Any not subscriptable
198                            record[record_field],  # type: ignore [index] # Any not subscriptable
199                        )
200                        for record_field, record_value in schema_field_name_to_type.items()
201                    }
202        except Exception as exc:
203            raise RecordParseError(
204                FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no
205            ) from exc
206
207    @property
208    def file_read_mode(self) -> FileReadMode:
209        return FileReadMode.READ_BINARY
210
211    @staticmethod
212    def _to_output_value(
213        avro_format: AvroFormat, record_type: Mapping[str, Any], record_value: Any
214    ) -> Any:
215        if isinstance(record_value, bytes):
216            return record_value.decode()
217        elif not isinstance(record_type, Mapping):
218            if record_type == "double" and avro_format.double_as_string:
219                return str(record_value)
220            return record_value
221        if record_type.get("logicalType") in ("decimal", "uuid"):
222            return str(record_value)
223        elif record_type.get("logicalType") == "date":
224            return record_value.isoformat()
225        elif record_type.get("logicalType") == "timestamp-millis":
226            return record_value.isoformat(sep="T", timespec="milliseconds")
227        elif record_type.get("logicalType") == "timestamp-micros":
228            return record_value.isoformat(sep="T", timespec="microseconds")
229        elif record_type.get("logicalType") == "local-timestamp-millis":
230            return record_value.isoformat(sep="T", timespec="milliseconds")
231        elif record_type.get("logicalType") == "local-timestamp-micros":
232            return record_value.isoformat(sep="T", timespec="microseconds")
233        else:
234            return record_value

An abstract class containing methods that must be implemented for each supported file type.

ENCODING = None
def check_config( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
50    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
51        """
52        AvroParser does not require config checks, implicit pydantic validation is enough.
53        """
54        return True, None

AvroParser does not require config checks, implicit pydantic validation is enough.

async def infer_schema( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger) -> Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]:
56    async def infer_schema(
57        self,
58        config: FileBasedStreamConfig,
59        file: RemoteFile,
60        stream_reader: AbstractFileBasedStreamReader,
61        logger: logging.Logger,
62    ) -> SchemaType:
63        avro_format = config.format
64        if not isinstance(avro_format, AvroFormat):
65            raise ValueError(f"Expected ParquetFormat, got {avro_format}")
66
67        with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
68            avro_reader = fastavro.reader(fp)  # type: ignore [arg-type]
69            avro_schema = avro_reader.writer_schema
70        if not avro_schema["type"] == "record":  # type: ignore [index, call-overload]
71            unsupported_type = avro_schema["type"]  # type: ignore [index, call-overload]
72            raise ValueError(
73                f"Only record based avro files are supported. Found {unsupported_type}"
74            )
75        json_schema = {
76            field["name"]: AvroParser._convert_avro_type_to_json(  # type: ignore [index]
77                avro_format,
78                field["name"],  # type: ignore [index]
79                field["type"],  # type: ignore [index]
80            )
81            for field in avro_schema["fields"]  # type: ignore [index, call-overload]
82        }
83        return json_schema

Infer the JSON Schema for this file.

def parse_records( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger, discovered_schema: Optional[Mapping[str, Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]]]) -> Iterable[Dict[str, Any]]:
171    def parse_records(
172        self,
173        config: FileBasedStreamConfig,
174        file: RemoteFile,
175        stream_reader: AbstractFileBasedStreamReader,
176        logger: logging.Logger,
177        discovered_schema: Optional[Mapping[str, SchemaType]],
178    ) -> Iterable[Dict[str, Any]]:
179        avro_format = config.format or AvroFormat(filetype="avro")
180        if not isinstance(avro_format, AvroFormat):
181            raise ValueError(f"Expected ParquetFormat, got {avro_format}")
182
183        line_no = 0
184        try:
185            with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
186                avro_reader = fastavro.reader(fp)  # type: ignore [arg-type]
187                schema = avro_reader.writer_schema
188                schema_field_name_to_type = {
189                    field["name"]: cast(dict[str, Any], field["type"])  # type: ignore [index]
190                    for field in schema["fields"]  # type: ignore [index, call-overload]  # If schema is not dict, it is not subscriptable by strings
191                }
192                for record in avro_reader:
193                    line_no += 1
194                    yield {
195                        record_field: self._to_output_value(
196                            avro_format,
197                            schema_field_name_to_type[record_field],  # type: ignore [index] # Any not subscriptable
198                            record[record_field],  # type: ignore [index] # Any not subscriptable
199                        )
200                        for record_field, record_value in schema_field_name_to_type.items()
201                    }
202        except Exception as exc:
203            raise RecordParseError(
204                FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no
205            ) from exc

Parse and emit each record.

file_read_mode: airbyte_cdk.sources.file_based.FileReadMode
207    @property
208    def file_read_mode(self) -> FileReadMode:
209        return FileReadMode.READ_BINARY

The mode in which the file should be opened for reading.

155class CsvParser(FileTypeParser):
156    _MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE = 1_000_000
157
158    def __init__(self, csv_reader: Optional[_CsvReader] = None, csv_field_max_bytes: int = 2**31):
159        # Increase the maximum length of data that can be parsed in a single CSV field. The default is 128k, which is typically sufficient
160        # but given the use of Airbyte in loading a large variety of data it is best to allow for a larger maximum field size to avoid
161        # skipping data on load. https://stackoverflow.com/questions/15063936/csv-error-field-larger-than-field-limit-131072
162        csv.field_size_limit(csv_field_max_bytes)
163        self._csv_reader = csv_reader if csv_reader else _CsvReader()
164
165    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
166        """
167        CsvParser does not require config checks, implicit pydantic validation is enough.
168        """
169        return True, None
170
171    async def infer_schema(
172        self,
173        config: FileBasedStreamConfig,
174        file: RemoteFile,
175        stream_reader: AbstractFileBasedStreamReader,
176        logger: logging.Logger,
177    ) -> SchemaType:
178        input_schema = config.get_input_schema()
179        if input_schema:
180            return input_schema
181
182        # todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
183        #  sources will likely require one. Rather than modify the interface now we can wait until the real use case
184        config_format = _extract_format(config)
185        type_inferrer_by_field: Dict[str, _TypeInferrer] = defaultdict(
186            lambda: _JsonTypeInferrer(
187                config_format.true_values, config_format.false_values, config_format.null_values
188            )
189            if config_format.inference_type != InferenceType.NONE
190            else _DisabledTypeInferrer()
191        )
192        data_generator = self._csv_reader.read_data(
193            config, file, stream_reader, logger, self.file_read_mode
194        )
195        read_bytes = 0
196        for row in data_generator:
197            for header, value in row.items():
198                type_inferrer_by_field[header].add_value(value)
199                # This is not accurate as a representation of how many bytes were read because csv does some processing on the actual value
200                # before returning. Given we would like to be more accurate, we could wrap the IO file using a decorator
201                read_bytes += len(value)
202            read_bytes += len(row) - 1  # for separators
203            if read_bytes >= self._MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE:
204                break
205
206        if not type_inferrer_by_field:
207            raise AirbyteTracedException(
208                message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. "
209                f"Else, please contact Airbyte.",
210                failure_type=FailureType.config_error,
211            )
212        schema = {
213            header.strip(): {"type": type_inferred.infer()}
214            for header, type_inferred in type_inferrer_by_field.items()
215        }
216        data_generator.close()
217        return schema
218
219    def parse_records(
220        self,
221        config: FileBasedStreamConfig,
222        file: RemoteFile,
223        stream_reader: AbstractFileBasedStreamReader,
224        logger: logging.Logger,
225        discovered_schema: Optional[Mapping[str, SchemaType]],
226    ) -> Iterable[Dict[str, Any]]:
227        line_no = 0
228        try:
229            config_format = _extract_format(config)
230            if discovered_schema:
231                property_types = {
232                    col: prop["type"] for col, prop in discovered_schema["properties"].items()
233                }
234                deduped_property_types = CsvParser._pre_propcess_property_types(property_types)
235            else:
236                deduped_property_types = {}
237            cast_fn = CsvParser._get_cast_function(
238                deduped_property_types, config_format, logger, config.schemaless
239            )
240            data_generator = self._csv_reader.read_data(
241                config, file, stream_reader, logger, self.file_read_mode
242            )
243            for row in data_generator:
244                line_no += 1
245                yield CsvParser._to_nullable(
246                    cast_fn(row),
247                    deduped_property_types,
248                    config_format.null_values,
249                    config_format.strings_can_be_null,
250                )
251        except RecordParseError as parse_err:
252            raise RecordParseError(
253                FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no
254            ) from parse_err
255        finally:
256            data_generator.close()
257
258    @property
259    def file_read_mode(self) -> FileReadMode:
260        return FileReadMode.READ
261
262    @staticmethod
263    def _get_cast_function(
264        deduped_property_types: Mapping[str, str],
265        config_format: CsvFormat,
266        logger: logging.Logger,
267        schemaless: bool,
268    ) -> Callable[[Mapping[str, str]], Mapping[str, str]]:
269        # Only cast values if the schema is provided
270        if deduped_property_types and not schemaless:
271            return partial(
272                CsvParser._cast_types,
273                deduped_property_types=deduped_property_types,
274                config_format=config_format,
275                logger=logger,
276            )
277        else:
278            # If no schema is provided, yield the rows as they are
279            return _no_cast
280
281    @staticmethod
282    def _to_nullable(
283        row: Mapping[str, str],
284        deduped_property_types: Mapping[str, str],
285        null_values: Set[str],
286        strings_can_be_null: bool,
287    ) -> Dict[str, Optional[str]]:
288        nullable = {
289            k: None
290            if CsvParser._value_is_none(
291                v, deduped_property_types.get(k), null_values, strings_can_be_null
292            )
293            else v
294            for k, v in row.items()
295        }
296        return nullable
297
298    @staticmethod
299    def _value_is_none(
300        value: Any,
301        deduped_property_type: Optional[str],
302        null_values: Set[str],
303        strings_can_be_null: bool,
304    ) -> bool:
305        return value in null_values and (strings_can_be_null or deduped_property_type != "string")
306
307    @staticmethod
308    def _pre_propcess_property_types(property_types: Dict[str, Any]) -> Mapping[str, str]:
309        """
310        Transform the property types to be non-nullable and remove duplicate types if any.
311        Sample input:
312        {
313        "col1": ["string", "null"],
314        "col2": ["string", "string", "null"],
315        "col3": "integer"
316        }
317
318        Sample output:
319        {
320        "col1": "string",
321        "col2": "string",
322        "col3": "integer",
323        }
324        """
325        output = {}
326        for prop, prop_type in property_types.items():
327            if isinstance(prop_type, list):
328                prop_type_distinct = set(prop_type)
329                prop_type_distinct.remove("null")
330                if len(prop_type_distinct) != 1:
331                    raise ValueError(f"Could not get non nullable type from {prop_type}")
332                output[prop] = next(iter(prop_type_distinct))
333            else:
334                output[prop] = prop_type
335        return output
336
337    @staticmethod
338    def _cast_types(
339        row: Dict[str, str],
340        deduped_property_types: Mapping[str, str],
341        config_format: CsvFormat,
342        logger: logging.Logger,
343    ) -> Dict[str, Any]:
344        """
345        Casts the values in the input 'row' dictionary according to the types defined in the JSON schema.
346
347        Array and object types are only handled if they can be deserialized as JSON.
348
349        If any errors are encountered, the value will be emitted as a string.
350        """
351        warnings = []
352        result = {}
353
354        for key, value in row.items():
355            prop_type = deduped_property_types.get(key)
356            cast_value: Any = value
357
358            if prop_type in TYPE_PYTHON_MAPPING and prop_type is not None:
359                _, python_type = TYPE_PYTHON_MAPPING[prop_type]
360
361                if python_type is None:
362                    if value == "":
363                        cast_value = None
364                    else:
365                        warnings.append(_format_warning(key, value, prop_type))
366
367                elif python_type is bool:
368                    try:
369                        cast_value = _value_to_bool(
370                            value, config_format.true_values, config_format.false_values
371                        )
372                    except ValueError:
373                        warnings.append(_format_warning(key, value, prop_type))
374
375                elif python_type is dict:
376                    try:
377                        # we don't re-use _value_to_object here because we type the column as object as long as there is only one object
378                        cast_value = orjson.loads(value)
379                    except orjson.JSONDecodeError:
380                        warnings.append(_format_warning(key, value, prop_type))
381
382                elif python_type is list:
383                    try:
384                        cast_value = _value_to_list(value)
385                    except (ValueError, json.JSONDecodeError):
386                        warnings.append(_format_warning(key, value, prop_type))
387
388                elif python_type:
389                    try:
390                        cast_value = _value_to_python_type(value, python_type)
391                    except ValueError:
392                        warnings.append(_format_warning(key, value, prop_type))
393
394                result[key] = cast_value
395
396        if warnings:
397            logger.warning(
398                f"{FileBasedSourceError.ERROR_CASTING_VALUE.value}: {','.join([w for w in warnings])}",
399            )
400        return result

An abstract class containing methods that must be implemented for each supported file type.

CsvParser( csv_reader: Optional[airbyte_cdk.sources.file_based.file_types.csv_parser._CsvReader] = None, csv_field_max_bytes: int = 2147483648)
158    def __init__(self, csv_reader: Optional[_CsvReader] = None, csv_field_max_bytes: int = 2**31):
159        # Increase the maximum length of data that can be parsed in a single CSV field. The default is 128k, which is typically sufficient
160        # but given the use of Airbyte in loading a large variety of data it is best to allow for a larger maximum field size to avoid
161        # skipping data on load. https://stackoverflow.com/questions/15063936/csv-error-field-larger-than-field-limit-131072
162        csv.field_size_limit(csv_field_max_bytes)
163        self._csv_reader = csv_reader if csv_reader else _CsvReader()
def check_config( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
165    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
166        """
167        CsvParser does not require config checks, implicit pydantic validation is enough.
168        """
169        return True, None

CsvParser does not require config checks, implicit pydantic validation is enough.

async def infer_schema( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger) -> Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]:
171    async def infer_schema(
172        self,
173        config: FileBasedStreamConfig,
174        file: RemoteFile,
175        stream_reader: AbstractFileBasedStreamReader,
176        logger: logging.Logger,
177    ) -> SchemaType:
178        input_schema = config.get_input_schema()
179        if input_schema:
180            return input_schema
181
182        # todo: the existing InMemoryFilesSource.open_file() test source doesn't currently require an encoding, but actual
183        #  sources will likely require one. Rather than modify the interface now we can wait until the real use case
184        config_format = _extract_format(config)
185        type_inferrer_by_field: Dict[str, _TypeInferrer] = defaultdict(
186            lambda: _JsonTypeInferrer(
187                config_format.true_values, config_format.false_values, config_format.null_values
188            )
189            if config_format.inference_type != InferenceType.NONE
190            else _DisabledTypeInferrer()
191        )
192        data_generator = self._csv_reader.read_data(
193            config, file, stream_reader, logger, self.file_read_mode
194        )
195        read_bytes = 0
196        for row in data_generator:
197            for header, value in row.items():
198                type_inferrer_by_field[header].add_value(value)
199                # This is not accurate as a representation of how many bytes were read because csv does some processing on the actual value
200                # before returning. Given we would like to be more accurate, we could wrap the IO file using a decorator
201                read_bytes += len(value)
202            read_bytes += len(row) - 1  # for separators
203            if read_bytes >= self._MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE:
204                break
205
206        if not type_inferrer_by_field:
207            raise AirbyteTracedException(
208                message=f"Could not infer schema as there are no rows in {file.uri}. If having an empty CSV file is expected, ignore this. "
209                f"Else, please contact Airbyte.",
210                failure_type=FailureType.config_error,
211            )
212        schema = {
213            header.strip(): {"type": type_inferred.infer()}
214            for header, type_inferred in type_inferrer_by_field.items()
215        }
216        data_generator.close()
217        return schema

Infer the JSON Schema for this file.

def parse_records( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger, discovered_schema: Optional[Mapping[str, Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]]]) -> Iterable[Dict[str, Any]]:
219    def parse_records(
220        self,
221        config: FileBasedStreamConfig,
222        file: RemoteFile,
223        stream_reader: AbstractFileBasedStreamReader,
224        logger: logging.Logger,
225        discovered_schema: Optional[Mapping[str, SchemaType]],
226    ) -> Iterable[Dict[str, Any]]:
227        line_no = 0
228        try:
229            config_format = _extract_format(config)
230            if discovered_schema:
231                property_types = {
232                    col: prop["type"] for col, prop in discovered_schema["properties"].items()
233                }
234                deduped_property_types = CsvParser._pre_propcess_property_types(property_types)
235            else:
236                deduped_property_types = {}
237            cast_fn = CsvParser._get_cast_function(
238                deduped_property_types, config_format, logger, config.schemaless
239            )
240            data_generator = self._csv_reader.read_data(
241                config, file, stream_reader, logger, self.file_read_mode
242            )
243            for row in data_generator:
244                line_no += 1
245                yield CsvParser._to_nullable(
246                    cast_fn(row),
247                    deduped_property_types,
248                    config_format.null_values,
249                    config_format.strings_can_be_null,
250                )
251        except RecordParseError as parse_err:
252            raise RecordParseError(
253                FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line_no
254            ) from parse_err
255        finally:
256            data_generator.close()

Parse and emit each record.

file_read_mode: airbyte_cdk.sources.file_based.FileReadMode
258    @property
259    def file_read_mode(self) -> FileReadMode:
260        return FileReadMode.READ

The mode in which the file should be opened for reading.

 35class ExcelParser(FileTypeParser):
 36    ENCODING = None
 37
 38    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
 39        """
 40        ExcelParser does not require config checks, implicit pydantic validation is enough.
 41        """
 42        return True, None
 43
 44    async def infer_schema(
 45        self,
 46        config: FileBasedStreamConfig,
 47        file: RemoteFile,
 48        stream_reader: AbstractFileBasedStreamReader,
 49        logger: logging.Logger,
 50    ) -> SchemaType:
 51        """
 52        Infers the schema of the Excel file by examining its contents.
 53
 54        Args:
 55            config (FileBasedStreamConfig): Configuration for the file-based stream.
 56            file (RemoteFile): The remote file to be read.
 57            stream_reader (AbstractFileBasedStreamReader): Reader to read the file.
 58            logger (logging.Logger): Logger for logging information and errors.
 59
 60        Returns:
 61            SchemaType: Inferred schema of the Excel file.
 62        """
 63
 64        # Validate the format of the config
 65        self.validate_format(config.format, logger)
 66
 67        fields: Dict[str, str] = {}
 68
 69        with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
 70            df = self.open_and_parse_file(fp)
 71            for column, df_type in df.dtypes.items():
 72                # Choose the broadest data type if the column's data type differs in dataframes
 73                prev_frame_column_type = fields.get(column)  # type: ignore [call-overload]
 74                fields[column] = self.dtype_to_json_type(  # type: ignore [index]
 75                    prev_frame_column_type,
 76                    df_type,
 77                )
 78
 79        schema = {
 80            field: (
 81                {"type": "string", "format": "date-time"}
 82                if fields[field] == "date-time"
 83                else {"type": fields[field]}
 84            )
 85            for field in fields
 86        }
 87        return schema
 88
 89    def parse_records(
 90        self,
 91        config: FileBasedStreamConfig,
 92        file: RemoteFile,
 93        stream_reader: AbstractFileBasedStreamReader,
 94        logger: logging.Logger,
 95        discovered_schema: Optional[Mapping[str, SchemaType]] = None,
 96    ) -> Iterable[Dict[str, Any]]:
 97        """
 98        Parses records from an Excel file based on the provided configuration.
 99
100        Args:
101            config (FileBasedStreamConfig): Configuration for the file-based stream.
102            file (RemoteFile): The remote file to be read.
103            stream_reader (AbstractFileBasedStreamReader): Reader to read the file.
104            logger (logging.Logger): Logger for logging information and errors.
105            discovered_schema (Optional[Mapping[str, SchemaType]]): Discovered schema for validation.
106
107        Yields:
108            Iterable[Dict[str, Any]]: Parsed records from the Excel file.
109        """
110
111        # Validate the format of the config
112        self.validate_format(config.format, logger)
113
114        try:
115            # Open and parse the file using the stream reader
116            with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
117                df = self.open_and_parse_file(fp)
118                # Yield records as dictionaries
119                # DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson
120                # DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior
121                # see PR description: https://github.com/airbytehq/airbyte/pull/44444/
122                yield from orjson.loads(
123                    df.to_json(orient="records", date_format="iso", date_unit="us")
124                )
125
126        except Exception as exc:
127            # Raise a RecordParseError if any exception occurs during parsing
128            raise RecordParseError(
129                FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri
130            ) from exc
131
132    @property
133    def file_read_mode(self) -> FileReadMode:
134        """
135        Returns the file read mode for the Excel file.
136
137        Returns:
138            FileReadMode: The file read mode (binary).
139        """
140        return FileReadMode.READ_BINARY
141
142    @staticmethod
143    def dtype_to_json_type(
144        current_type: Optional[str],
145        dtype: dtype_,  # type: ignore [type-arg]
146    ) -> str:
147        """
148        Convert Pandas DataFrame types to Airbyte Types.
149
150        Args:
151            current_type (Optional[str]): One of the previous types based on earlier dataframes.
152            dtype: Pandas DataFrame type.
153
154        Returns:
155            str: Corresponding Airbyte Type.
156        """
157        number_types = ("int64", "float64")
158        if current_type == "string":
159            # Previous column values were of the string type, no need to look further.
160            return current_type
161        if dtype is object:
162            return "string"
163        if dtype in number_types and (not current_type or current_type == "number"):
164            return "number"
165        if dtype == "bool" and (not current_type or current_type == "boolean"):
166            return "boolean"
167        if issubdtype(dtype, datetime64):
168            return "date-time"
169        return "string"
170
171    @staticmethod
172    def validate_format(excel_format: BaseModel, logger: logging.Logger) -> None:
173        """
174        Validates if the given format is of type ExcelFormat.
175
176        Args:
177            excel_format (Any): The format to be validated.
178
179        Raises:
180            ConfigValidationError: If the format is not ExcelFormat.
181        """
182        if not isinstance(excel_format, ExcelFormat):
183            logger.info(f"Expected ExcelFormat, got {excel_format}")
184            raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)
185
186    @staticmethod
187    def open_and_parse_file(fp: Union[IOBase, str, Path]) -> pd.DataFrame:
188        """
189        Opens and parses the Excel file.
190
191        Args:
192            fp: File pointer to the Excel file.
193
194        Returns:
195            pd.DataFrame: Parsed data from the Excel file.
196        """
197        return pd.ExcelFile(fp, engine="calamine").parse()  # type: ignore [arg-type, call-overload, no-any-return]

An abstract class containing methods that must be implemented for each supported file type.

ENCODING = None
def check_config( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
38    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
39        """
40        ExcelParser does not require config checks, implicit pydantic validation is enough.
41        """
42        return True, None

ExcelParser does not require config checks, implicit pydantic validation is enough.

async def infer_schema( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger) -> Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]:
44    async def infer_schema(
45        self,
46        config: FileBasedStreamConfig,
47        file: RemoteFile,
48        stream_reader: AbstractFileBasedStreamReader,
49        logger: logging.Logger,
50    ) -> SchemaType:
51        """
52        Infers the schema of the Excel file by examining its contents.
53
54        Args:
55            config (FileBasedStreamConfig): Configuration for the file-based stream.
56            file (RemoteFile): The remote file to be read.
57            stream_reader (AbstractFileBasedStreamReader): Reader to read the file.
58            logger (logging.Logger): Logger for logging information and errors.
59
60        Returns:
61            SchemaType: Inferred schema of the Excel file.
62        """
63
64        # Validate the format of the config
65        self.validate_format(config.format, logger)
66
67        fields: Dict[str, str] = {}
68
69        with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
70            df = self.open_and_parse_file(fp)
71            for column, df_type in df.dtypes.items():
72                # Choose the broadest data type if the column's data type differs in dataframes
73                prev_frame_column_type = fields.get(column)  # type: ignore [call-overload]
74                fields[column] = self.dtype_to_json_type(  # type: ignore [index]
75                    prev_frame_column_type,
76                    df_type,
77                )
78
79        schema = {
80            field: (
81                {"type": "string", "format": "date-time"}
82                if fields[field] == "date-time"
83                else {"type": fields[field]}
84            )
85            for field in fields
86        }
87        return schema

Infers the schema of the Excel file by examining its contents.

Arguments:
  • config (FileBasedStreamConfig): Configuration for the file-based stream.
  • file (RemoteFile): The remote file to be read.
  • stream_reader (AbstractFileBasedStreamReader): Reader to read the file.
  • logger (logging.Logger): Logger for logging information and errors.
Returns:

SchemaType: Inferred schema of the Excel file.

def parse_records( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger, discovered_schema: Optional[Mapping[str, Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]]] = None) -> Iterable[Dict[str, Any]]:
 89    def parse_records(
 90        self,
 91        config: FileBasedStreamConfig,
 92        file: RemoteFile,
 93        stream_reader: AbstractFileBasedStreamReader,
 94        logger: logging.Logger,
 95        discovered_schema: Optional[Mapping[str, SchemaType]] = None,
 96    ) -> Iterable[Dict[str, Any]]:
 97        """
 98        Parses records from an Excel file based on the provided configuration.
 99
100        Args:
101            config (FileBasedStreamConfig): Configuration for the file-based stream.
102            file (RemoteFile): The remote file to be read.
103            stream_reader (AbstractFileBasedStreamReader): Reader to read the file.
104            logger (logging.Logger): Logger for logging information and errors.
105            discovered_schema (Optional[Mapping[str, SchemaType]]): Discovered schema for validation.
106
107        Yields:
108            Iterable[Dict[str, Any]]: Parsed records from the Excel file.
109        """
110
111        # Validate the format of the config
112        self.validate_format(config.format, logger)
113
114        try:
115            # Open and parse the file using the stream reader
116            with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
117                df = self.open_and_parse_file(fp)
118                # Yield records as dictionaries
119                # DataFrame.to_dict() method returns datetime values in pandas.Timestamp values, which are not serializable by orjson
120                # DataFrame.to_json() returns string with datetime values serialized to iso8601 with microseconds to align with pydantic behavior
121                # see PR description: https://github.com/airbytehq/airbyte/pull/44444/
122                yield from orjson.loads(
123                    df.to_json(orient="records", date_format="iso", date_unit="us")
124                )
125
126        except Exception as exc:
127            # Raise a RecordParseError if any exception occurs during parsing
128            raise RecordParseError(
129                FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri
130            ) from exc

Parses records from an Excel file based on the provided configuration.

Arguments:
  • config (FileBasedStreamConfig): Configuration for the file-based stream.
  • file (RemoteFile): The remote file to be read.
  • stream_reader (AbstractFileBasedStreamReader): Reader to read the file.
  • logger (logging.Logger): Logger for logging information and errors.
  • discovered_schema (Optional[Mapping[str, SchemaType]]): Discovered schema for validation.
Yields:

Iterable[Dict[str, Any]]: Parsed records from the Excel file.

file_read_mode: airbyte_cdk.sources.file_based.FileReadMode
132    @property
133    def file_read_mode(self) -> FileReadMode:
134        """
135        Returns the file read mode for the Excel file.
136
137        Returns:
138            FileReadMode: The file read mode (binary).
139        """
140        return FileReadMode.READ_BINARY

Returns the file read mode for the Excel file.

Returns:

FileReadMode: The file read mode (binary).

@staticmethod
def dtype_to_json_type(current_type: Optional[str], dtype: numpy.dtype) -> str:
142    @staticmethod
143    def dtype_to_json_type(
144        current_type: Optional[str],
145        dtype: dtype_,  # type: ignore [type-arg]
146    ) -> str:
147        """
148        Convert Pandas DataFrame types to Airbyte Types.
149
150        Args:
151            current_type (Optional[str]): One of the previous types based on earlier dataframes.
152            dtype: Pandas DataFrame type.
153
154        Returns:
155            str: Corresponding Airbyte Type.
156        """
157        number_types = ("int64", "float64")
158        if current_type == "string":
159            # Previous column values were of the string type, no need to look further.
160            return current_type
161        if dtype is object:
162            return "string"
163        if dtype in number_types and (not current_type or current_type == "number"):
164            return "number"
165        if dtype == "bool" and (not current_type or current_type == "boolean"):
166            return "boolean"
167        if issubdtype(dtype, datetime64):
168            return "date-time"
169        return "string"

Convert Pandas DataFrame types to Airbyte Types.

Arguments:
  • current_type (Optional[str]): One of the previous types based on earlier dataframes.
  • dtype: Pandas DataFrame type.
Returns:

str: Corresponding Airbyte Type.

@staticmethod
def validate_format(excel_format: pydantic.v1.main.BaseModel, logger: logging.Logger) -> None:
171    @staticmethod
172    def validate_format(excel_format: BaseModel, logger: logging.Logger) -> None:
173        """
174        Validates if the given format is of type ExcelFormat.
175
176        Args:
177            excel_format (Any): The format to be validated.
178
179        Raises:
180            ConfigValidationError: If the format is not ExcelFormat.
181        """
182        if not isinstance(excel_format, ExcelFormat):
183            logger.info(f"Expected ExcelFormat, got {excel_format}")
184            raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)

Validates if the given format is of type ExcelFormat.

Arguments:
  • excel_format (Any): The format to be validated.
Raises:
  • ConfigValidationError: If the format is not ExcelFormat.
@staticmethod
def open_and_parse_file(fp: Union[io.IOBase, str, pathlib.Path]) -> pandas.core.frame.DataFrame:
186    @staticmethod
187    def open_and_parse_file(fp: Union[IOBase, str, Path]) -> pd.DataFrame:
188        """
189        Opens and parses the Excel file.
190
191        Args:
192            fp: File pointer to the Excel file.
193
194        Returns:
195            pd.DataFrame: Parsed data from the Excel file.
196        """
197        return pd.ExcelFile(fp, engine="calamine").parse()  # type: ignore [arg-type, call-overload, no-any-return]

Opens and parses the Excel file.

Arguments:
  • fp: File pointer to the Excel file.
Returns:

pd.DataFrame: Parsed data from the Excel file.

 27class JsonlParser(FileTypeParser):
 28    MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE = 1_000_000
 29    ENCODING = "utf8"
 30
 31    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
 32        """
 33        JsonlParser does not require config checks, implicit pydantic validation is enough.
 34        """
 35        return True, None
 36
 37    async def infer_schema(
 38        self,
 39        config: FileBasedStreamConfig,
 40        file: RemoteFile,
 41        stream_reader: AbstractFileBasedStreamReader,
 42        logger: logging.Logger,
 43    ) -> SchemaType:
 44        """
 45        Infers the schema for the file by inferring the schema for each line, and merging
 46        it with the previously-inferred schema.
 47        """
 48        inferred_schema: Mapping[str, Any] = {}
 49
 50        for entry in self._parse_jsonl_entries(file, stream_reader, logger, read_limit=True):
 51            line_schema = self._infer_schema_for_record(entry)
 52            inferred_schema = merge_schemas(inferred_schema, line_schema)
 53
 54        return inferred_schema
 55
 56    def parse_records(
 57        self,
 58        config: FileBasedStreamConfig,
 59        file: RemoteFile,
 60        stream_reader: AbstractFileBasedStreamReader,
 61        logger: logging.Logger,
 62        discovered_schema: Optional[Mapping[str, SchemaType]],
 63    ) -> Iterable[Dict[str, Any]]:
 64        """
 65        This code supports parsing json objects over multiple lines even though this does not align with the JSONL format. This is for
 66        backward compatibility reasons i.e. the previous source-s3 parser did support this. The drawback is:
 67        * performance as the way we support json over multiple lines is very brute forced
 68        * given that we don't have `newlines_in_values` config to scope the possible inputs, we might parse the whole file before knowing if
 69          the input is improperly formatted or if the json is over multiple lines
 70
 71        The goal is to run the V4 of source-s3 in production, track the warning log emitted when there are multiline json objects and
 72        deprecate this feature if it's not a valid use case.
 73        """
 74        yield from self._parse_jsonl_entries(file, stream_reader, logger)
 75
 76    @classmethod
 77    def _infer_schema_for_record(cls, record: Dict[str, Any]) -> Dict[str, Any]:
 78        record_schema = {}
 79        for key, value in record.items():
 80            if value is None:
 81                record_schema[key] = {"type": "null"}
 82            else:
 83                record_schema[key] = {"type": PYTHON_TYPE_MAPPING[type(value)]}
 84
 85        return record_schema
 86
 87    @property
 88    def file_read_mode(self) -> FileReadMode:
 89        return FileReadMode.READ
 90
 91    def _parse_jsonl_entries(
 92        self,
 93        file: RemoteFile,
 94        stream_reader: AbstractFileBasedStreamReader,
 95        logger: logging.Logger,
 96        read_limit: bool = False,
 97    ) -> Iterable[Dict[str, Any]]:
 98        with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
 99            read_bytes = 0
100
101            had_json_parsing_error = False
102            has_warned_for_multiline_json_object = False
103            yielded_at_least_once = False
104
105            accumulator = None
106            for line in fp:
107                if not accumulator:
108                    accumulator = self._instantiate_accumulator(line)
109                read_bytes += len(line)
110                accumulator += line  # type: ignore [operator]  # In reality, it's either bytes or string and we add the same type
111                try:
112                    record = orjson.loads(accumulator)
113                    if had_json_parsing_error and not has_warned_for_multiline_json_object:
114                        logger.warning(
115                            f"File at {file.uri} is using multiline JSON. Performance could be greatly reduced"
116                        )
117                        has_warned_for_multiline_json_object = True
118
119                    yield record
120                    yielded_at_least_once = True
121                    accumulator = self._instantiate_accumulator(line)
122                except orjson.JSONDecodeError:
123                    had_json_parsing_error = True
124
125                if (
126                    read_limit
127                    and yielded_at_least_once
128                    and read_bytes >= self.MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE
129                ):
130                    logger.warning(
131                        f"Exceeded the maximum number of bytes per file for schema inference ({self.MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE}). "
132                        f"Inferring schema from an incomplete set of records."
133                    )
134                    break
135
136            if had_json_parsing_error and not yielded_at_least_once:
137                raise RecordParseError(
138                    FileBasedSourceError.ERROR_PARSING_RECORD, filename=file.uri, lineno=line
139                )
140
141    @staticmethod
142    def _instantiate_accumulator(line: Union[bytes, str]) -> Union[bytes, str]:
143        if isinstance(line, bytes):
144            return bytes("", json.detect_encoding(line))
145        elif isinstance(line, str):
146            return ""

An abstract class containing methods that must be implemented for each supported file type.

MAX_BYTES_PER_FILE_FOR_SCHEMA_INFERENCE = 1000000
ENCODING = 'utf8'
def check_config( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
31    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
32        """
33        JsonlParser does not require config checks, implicit pydantic validation is enough.
34        """
35        return True, None

JsonlParser does not require config checks, implicit pydantic validation is enough.

async def infer_schema( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger) -> Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]:
37    async def infer_schema(
38        self,
39        config: FileBasedStreamConfig,
40        file: RemoteFile,
41        stream_reader: AbstractFileBasedStreamReader,
42        logger: logging.Logger,
43    ) -> SchemaType:
44        """
45        Infers the schema for the file by inferring the schema for each line, and merging
46        it with the previously-inferred schema.
47        """
48        inferred_schema: Mapping[str, Any] = {}
49
50        for entry in self._parse_jsonl_entries(file, stream_reader, logger, read_limit=True):
51            line_schema = self._infer_schema_for_record(entry)
52            inferred_schema = merge_schemas(inferred_schema, line_schema)
53
54        return inferred_schema

Infers the schema for the file by inferring the schema for each line, and merging it with the previously-inferred schema.

def parse_records( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger, discovered_schema: Optional[Mapping[str, Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]]]) -> Iterable[Dict[str, Any]]:
56    def parse_records(
57        self,
58        config: FileBasedStreamConfig,
59        file: RemoteFile,
60        stream_reader: AbstractFileBasedStreamReader,
61        logger: logging.Logger,
62        discovered_schema: Optional[Mapping[str, SchemaType]],
63    ) -> Iterable[Dict[str, Any]]:
64        """
65        This code supports parsing json objects over multiple lines even though this does not align with the JSONL format. This is for
66        backward compatibility reasons i.e. the previous source-s3 parser did support this. The drawback is:
67        * performance as the way we support json over multiple lines is very brute forced
68        * given that we don't have `newlines_in_values` config to scope the possible inputs, we might parse the whole file before knowing if
69          the input is improperly formatted or if the json is over multiple lines
70
71        The goal is to run the V4 of source-s3 in production, track the warning log emitted when there are multiline json objects and
72        deprecate this feature if it's not a valid use case.
73        """
74        yield from self._parse_jsonl_entries(file, stream_reader, logger)

This code supports parsing json objects over multiple lines even though this does not align with the JSONL format. This is for backward compatibility reasons i.e. the previous source-s3 parser did support this. The drawback is:

  • performance as the way we support json over multiple lines is very brute forced
  • given that we don't have newlines_in_values config to scope the possible inputs, we might parse the whole file before knowing if the input is improperly formatted or if the json is over multiple lines

The goal is to run the V4 of source-s3 in production, track the warning log emitted when there are multiline json objects and deprecate this feature if it's not a valid use case.

file_read_mode: airbyte_cdk.sources.file_based.FileReadMode
87    @property
88    def file_read_mode(self) -> FileReadMode:
89        return FileReadMode.READ

The mode in which the file should be opened for reading.

 34class ParquetParser(FileTypeParser):
 35    ENCODING = None
 36
 37    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
 38        """
 39        ParquetParser does not require config checks, implicit pydantic validation is enough.
 40        """
 41        return True, None
 42
 43    async def infer_schema(
 44        self,
 45        config: FileBasedStreamConfig,
 46        file: RemoteFile,
 47        stream_reader: AbstractFileBasedStreamReader,
 48        logger: logging.Logger,
 49    ) -> SchemaType:
 50        parquet_format = config.format
 51        if not isinstance(parquet_format, ParquetFormat):
 52            raise ValueError(f"Expected ParquetFormat, got {parquet_format}")
 53
 54        with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
 55            parquet_file = pq.ParquetFile(fp)
 56            parquet_schema = parquet_file.schema_arrow
 57
 58        # Inferred non-partition schema
 59        schema = {
 60            field.name: ParquetParser.parquet_type_to_schema_type(field.type, parquet_format)
 61            for field in parquet_schema
 62        }
 63        # Inferred partition schema
 64        partition_columns = {
 65            partition.split("=")[0]: {"type": "string"}
 66            for partition in self._extract_partitions(file.uri)
 67        }
 68
 69        schema.update(partition_columns)
 70        return schema
 71
 72    def parse_records(
 73        self,
 74        config: FileBasedStreamConfig,
 75        file: RemoteFile,
 76        stream_reader: AbstractFileBasedStreamReader,
 77        logger: logging.Logger,
 78        discovered_schema: Optional[Mapping[str, SchemaType]],
 79    ) -> Iterable[Dict[str, Any]]:
 80        parquet_format = config.format
 81        if not isinstance(parquet_format, ParquetFormat):
 82            logger.info(f"Expected ParquetFormat, got {parquet_format}")
 83            raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)
 84
 85        line_no = 0
 86        try:
 87            with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
 88                reader = pq.ParquetFile(fp)
 89                partition_columns = {
 90                    x.split("=")[0]: x.split("=")[1] for x in self._extract_partitions(file.uri)
 91                }
 92                for row_group in range(reader.num_row_groups):
 93                    batch = reader.read_row_group(row_group)
 94                    for row in range(batch.num_rows):
 95                        line_no += 1
 96                        yield {
 97                            **{
 98                                column: ParquetParser._to_output_value(
 99                                    batch.column(column)[row], parquet_format
100                                )
101                                for column in batch.column_names
102                            },
103                            **partition_columns,
104                        }
105        except Exception as exc:
106            raise RecordParseError(
107                FileBasedSourceError.ERROR_PARSING_RECORD,
108                filename=file.uri,
109                lineno=f"{row_group=}, {line_no=}",
110            ) from exc
111
112    @staticmethod
113    def _extract_partitions(filepath: str) -> List[str]:
114        return [unquote(partition) for partition in filepath.split(os.sep) if "=" in partition]
115
116    @property
117    def file_read_mode(self) -> FileReadMode:
118        return FileReadMode.READ_BINARY
119
120    @staticmethod
121    def _to_output_value(
122        parquet_value: Union[Scalar, DictionaryArray], parquet_format: ParquetFormat
123    ) -> Any:
124        """
125        Convert an entry in a pyarrow table to a value that can be output by the source.
126        """
127        if isinstance(parquet_value, DictionaryArray):
128            return ParquetParser._dictionary_array_to_python_value(parquet_value)
129        else:
130            return ParquetParser._scalar_to_python_value(parquet_value, parquet_format)
131
132    @staticmethod
133    def _scalar_to_python_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> Any:
134        """
135        Convert a pyarrow scalar to a value that can be output by the source.
136        """
137        if parquet_value.as_py() is None:
138            return None
139
140        # Convert date and datetime objects to isoformat strings
141        if (
142            pa.types.is_time(parquet_value.type)
143            or pa.types.is_timestamp(parquet_value.type)
144            or pa.types.is_date(parquet_value.type)
145        ):
146            return parquet_value.as_py().isoformat()
147
148        # Convert month_day_nano_interval to array
149        if parquet_value.type == pa.month_day_nano_interval():
150            return json.loads(json.dumps(parquet_value.as_py()))
151
152        # Decode binary strings to utf-8
153        if ParquetParser._is_binary(parquet_value.type):
154            return parquet_value.as_py().decode("utf-8")
155
156        if pa.types.is_decimal(parquet_value.type):
157            if parquet_format.decimal_as_float:
158                return float(parquet_value.as_py())
159            else:
160                return str(parquet_value.as_py())
161
162        if pa.types.is_map(parquet_value.type):
163            return {k: v for k, v in parquet_value.as_py()}
164
165        if pa.types.is_null(parquet_value.type):
166            return None
167
168        # Convert duration to seconds, then convert to the appropriate unit
169        if pa.types.is_duration(parquet_value.type):
170            duration = parquet_value.as_py()
171            duration_seconds = duration.total_seconds()
172            if parquet_value.type.unit == "s":
173                return duration_seconds
174            elif parquet_value.type.unit == "ms":
175                return duration_seconds * 1000
176            elif parquet_value.type.unit == "us":
177                return duration_seconds * 1_000_000
178            elif parquet_value.type.unit == "ns":
179                return duration_seconds * 1_000_000_000 + duration.nanoseconds
180            else:
181                raise ValueError(f"Unknown duration unit: {parquet_value.type.unit}")
182        else:
183            return parquet_value.as_py()
184
185    @staticmethod
186    def _dictionary_array_to_python_value(parquet_value: DictionaryArray) -> Dict[str, Any]:
187        """
188        Convert a pyarrow dictionary array to a value that can be output by the source.
189
190        Dictionaries are stored as two columns: indices and values
191        The indices column is an array of integers that maps to the values column
192        """
193
194        return {
195            "indices": parquet_value.indices.tolist(),
196            "values": parquet_value.dictionary.tolist(),
197        }
198
199    @staticmethod
200    def parquet_type_to_schema_type(
201        parquet_type: pa.DataType, parquet_format: ParquetFormat
202    ) -> Mapping[str, str]:
203        """
204        Convert a pyarrow data type to an Airbyte schema type.
205        Parquet data types are defined at https://arrow.apache.org/docs/python/api/datatypes.html
206        """
207
208        if pa.types.is_timestamp(parquet_type):
209            return {"type": "string", "format": "date-time"}
210        elif pa.types.is_date(parquet_type):
211            return {"type": "string", "format": "date"}
212        elif ParquetParser._is_string(parquet_type, parquet_format):
213            return {"type": "string"}
214        elif pa.types.is_boolean(parquet_type):
215            return {"type": "boolean"}
216        elif ParquetParser._is_integer(parquet_type):
217            return {"type": "integer"}
218        elif ParquetParser._is_float(parquet_type, parquet_format):
219            return {"type": "number"}
220        elif ParquetParser._is_object(parquet_type):
221            return {"type": "object"}
222        elif ParquetParser._is_list(parquet_type):
223            return {"type": "array"}
224        elif pa.types.is_null(parquet_type):
225            return {"type": "null"}
226        else:
227            raise ValueError(f"Unsupported parquet type: {parquet_type}")
228
229    @staticmethod
230    def _is_binary(parquet_type: pa.DataType) -> bool:
231        return bool(
232            pa.types.is_binary(parquet_type)
233            or pa.types.is_large_binary(parquet_type)
234            or pa.types.is_fixed_size_binary(parquet_type)
235        )
236
237    @staticmethod
238    def _is_integer(parquet_type: pa.DataType) -> bool:
239        return bool(pa.types.is_integer(parquet_type) or pa.types.is_duration(parquet_type))
240
241    @staticmethod
242    def _is_float(parquet_type: pa.DataType, parquet_format: ParquetFormat) -> bool:
243        if pa.types.is_decimal(parquet_type):
244            return parquet_format.decimal_as_float
245        else:
246            return bool(pa.types.is_floating(parquet_type))
247
248    @staticmethod
249    def _is_string(parquet_type: pa.DataType, parquet_format: ParquetFormat) -> bool:
250        if pa.types.is_decimal(parquet_type):
251            return not parquet_format.decimal_as_float
252        else:
253            return bool(
254                pa.types.is_time(parquet_type)
255                or pa.types.is_string(parquet_type)
256                or pa.types.is_large_string(parquet_type)
257                or ParquetParser._is_binary(
258                    parquet_type
259                )  # Best we can do is return as a string since we do not support binary
260            )
261
262    @staticmethod
263    def _is_object(parquet_type: pa.DataType) -> bool:
264        return bool(
265            pa.types.is_dictionary(parquet_type)
266            or pa.types.is_struct(parquet_type)
267            or pa.types.is_map(parquet_type)
268        )
269
270    @staticmethod
271    def _is_list(parquet_type: pa.DataType) -> bool:
272        return bool(
273            pa.types.is_list(parquet_type)
274            or pa.types.is_large_list(parquet_type)
275            or parquet_type == pa.month_day_nano_interval()
276        )

An abstract class containing methods that must be implemented for each supported file type.

ENCODING = None
def check_config( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
37    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
38        """
39        ParquetParser does not require config checks, implicit pydantic validation is enough.
40        """
41        return True, None

ParquetParser does not require config checks, implicit pydantic validation is enough.

async def infer_schema( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger) -> Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]:
43    async def infer_schema(
44        self,
45        config: FileBasedStreamConfig,
46        file: RemoteFile,
47        stream_reader: AbstractFileBasedStreamReader,
48        logger: logging.Logger,
49    ) -> SchemaType:
50        parquet_format = config.format
51        if not isinstance(parquet_format, ParquetFormat):
52            raise ValueError(f"Expected ParquetFormat, got {parquet_format}")
53
54        with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
55            parquet_file = pq.ParquetFile(fp)
56            parquet_schema = parquet_file.schema_arrow
57
58        # Inferred non-partition schema
59        schema = {
60            field.name: ParquetParser.parquet_type_to_schema_type(field.type, parquet_format)
61            for field in parquet_schema
62        }
63        # Inferred partition schema
64        partition_columns = {
65            partition.split("=")[0]: {"type": "string"}
66            for partition in self._extract_partitions(file.uri)
67        }
68
69        schema.update(partition_columns)
70        return schema

Infer the JSON Schema for this file.

def parse_records( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger, discovered_schema: Optional[Mapping[str, Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]]]) -> Iterable[Dict[str, Any]]:
 72    def parse_records(
 73        self,
 74        config: FileBasedStreamConfig,
 75        file: RemoteFile,
 76        stream_reader: AbstractFileBasedStreamReader,
 77        logger: logging.Logger,
 78        discovered_schema: Optional[Mapping[str, SchemaType]],
 79    ) -> Iterable[Dict[str, Any]]:
 80        parquet_format = config.format
 81        if not isinstance(parquet_format, ParquetFormat):
 82            logger.info(f"Expected ParquetFormat, got {parquet_format}")
 83            raise ConfigValidationError(FileBasedSourceError.CONFIG_VALIDATION_ERROR)
 84
 85        line_no = 0
 86        try:
 87            with stream_reader.open_file(file, self.file_read_mode, self.ENCODING, logger) as fp:
 88                reader = pq.ParquetFile(fp)
 89                partition_columns = {
 90                    x.split("=")[0]: x.split("=")[1] for x in self._extract_partitions(file.uri)
 91                }
 92                for row_group in range(reader.num_row_groups):
 93                    batch = reader.read_row_group(row_group)
 94                    for row in range(batch.num_rows):
 95                        line_no += 1
 96                        yield {
 97                            **{
 98                                column: ParquetParser._to_output_value(
 99                                    batch.column(column)[row], parquet_format
100                                )
101                                for column in batch.column_names
102                            },
103                            **partition_columns,
104                        }
105        except Exception as exc:
106            raise RecordParseError(
107                FileBasedSourceError.ERROR_PARSING_RECORD,
108                filename=file.uri,
109                lineno=f"{row_group=}, {line_no=}",
110            ) from exc

Parse and emit each record.

file_read_mode: airbyte_cdk.sources.file_based.FileReadMode
116    @property
117    def file_read_mode(self) -> FileReadMode:
118        return FileReadMode.READ_BINARY

The mode in which the file should be opened for reading.

@staticmethod
def parquet_type_to_schema_type( parquet_type: pyarrow.lib.DataType, parquet_format: airbyte_cdk.sources.file_based.config.parquet_format.ParquetFormat) -> Mapping[str, str]:
199    @staticmethod
200    def parquet_type_to_schema_type(
201        parquet_type: pa.DataType, parquet_format: ParquetFormat
202    ) -> Mapping[str, str]:
203        """
204        Convert a pyarrow data type to an Airbyte schema type.
205        Parquet data types are defined at https://arrow.apache.org/docs/python/api/datatypes.html
206        """
207
208        if pa.types.is_timestamp(parquet_type):
209            return {"type": "string", "format": "date-time"}
210        elif pa.types.is_date(parquet_type):
211            return {"type": "string", "format": "date"}
212        elif ParquetParser._is_string(parquet_type, parquet_format):
213            return {"type": "string"}
214        elif pa.types.is_boolean(parquet_type):
215            return {"type": "boolean"}
216        elif ParquetParser._is_integer(parquet_type):
217            return {"type": "integer"}
218        elif ParquetParser._is_float(parquet_type, parquet_format):
219            return {"type": "number"}
220        elif ParquetParser._is_object(parquet_type):
221            return {"type": "object"}
222        elif ParquetParser._is_list(parquet_type):
223            return {"type": "array"}
224        elif pa.types.is_null(parquet_type):
225            return {"type": "null"}
226        else:
227            raise ValueError(f"Unsupported parquet type: {parquet_type}")

Convert a pyarrow data type to an Airbyte schema type. Parquet data types are defined at https://arrow.apache.org/docs/python/api/datatypes.html

112class UnstructuredParser(FileTypeParser):
113    @property
114    def parser_max_n_files_for_schema_inference(self) -> Optional[int]:
115        """
116        Just check one file as the schema is static
117        """
118        return 1
119
120    @property
121    def parser_max_n_files_for_parsability(self) -> Optional[int]:
122        """
123        Do not check any files for parsability because it might be an expensive operation and doesn't give much confidence whether the sync will succeed.
124        """
125        return 0
126
127    def get_parser_defined_primary_key(self, config: FileBasedStreamConfig) -> Optional[str]:
128        """
129        Return the document_key field as the primary key.
130
131        his will pre-select the document key column as the primary key when setting up a connection, making it easier for the user to configure normalization in the destination.
132        """
133        return "document_key"
134
135    async def infer_schema(
136        self,
137        config: FileBasedStreamConfig,
138        file: RemoteFile,
139        stream_reader: AbstractFileBasedStreamReader,
140        logger: logging.Logger,
141    ) -> SchemaType:
142        format = _extract_format(config)
143        with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle:
144            filetype = self._get_filetype(file_handle, file)
145            if filetype not in self._supported_file_types() and not format.skip_unprocessable_files:
146                raise self._create_parse_error(
147                    file,
148                    self._get_file_type_error_message(filetype),
149                )
150
151            return {
152                "content": {
153                    "type": "string",
154                    "description": "Content of the file as markdown. Might be null if the file could not be parsed",
155                },
156                "document_key": {
157                    "type": "string",
158                    "description": "Unique identifier of the document, e.g. the file path",
159                },
160                "_ab_source_file_parse_error": {
161                    "type": "string",
162                    "description": "Error message if the file could not be parsed even though the file is supported",
163                },
164            }
165
166    def parse_records(
167        self,
168        config: FileBasedStreamConfig,
169        file: RemoteFile,
170        stream_reader: AbstractFileBasedStreamReader,
171        logger: logging.Logger,
172        discovered_schema: Optional[Mapping[str, SchemaType]],
173    ) -> Iterable[Dict[str, Any]]:
174        format = _extract_format(config)
175        with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle:
176            try:
177                markdown = self._read_file(file_handle, file, format, logger)
178                yield {
179                    "content": markdown,
180                    "document_key": file.uri,
181                    "_ab_source_file_parse_error": None,
182                }
183            except RecordParseError as e:
184                # RecordParseError is raised when the file can't be parsed because of a problem with the file content (either the file is not supported or the file is corrupted)
185                # if the skip_unprocessable_files flag is set, we log a warning and pass the error as part of the document
186                # otherwise, we raise the error to fail the sync
187                if format.skip_unprocessable_files:
188                    exception_str = str(e)
189                    logger.warn(f"File {file.uri} caused an error during parsing: {exception_str}.")
190                    yield {
191                        "content": None,
192                        "document_key": file.uri,
193                        "_ab_source_file_parse_error": exception_str,
194                    }
195                    logger.warn(f"File {file.uri} cannot be parsed. Skipping it.")
196                else:
197                    raise e
198            except Exception as e:
199                exception_str = str(e)
200                logger.error(f"File {file.uri} caused an error during parsing: {exception_str}.")
201                raise e
202
203    def _read_file(
204        self,
205        file_handle: IOBase,
206        remote_file: RemoteFile,
207        format: UnstructuredFormat,
208        logger: logging.Logger,
209    ) -> str:
210        _import_unstructured()
211        if (
212            (not unstructured_partition_pdf)
213            or (not unstructured_partition_docx)
214            or (not unstructured_partition_pptx)
215        ):
216            # check whether unstructured library is actually available for better error message and to ensure proper typing (can't be None after this point)
217            raise Exception("unstructured library is not available")
218
219        filetype: FileType | None = self._get_filetype(file_handle, remote_file)
220
221        if filetype is None or filetype not in self._supported_file_types():
222            raise self._create_parse_error(
223                remote_file,
224                self._get_file_type_error_message(filetype),
225            )
226        if filetype in {FileType.MD, FileType.TXT}:
227            file_content: bytes = file_handle.read()
228            decoded_content: str = optional_decode(file_content)
229            return decoded_content
230        if format.processing.mode == "local":
231            return self._read_file_locally(
232                file_handle,
233                filetype,
234                format.strategy,
235                remote_file,
236            )
237        elif format.processing.mode == "api":
238            try:
239                result: str = self._read_file_remotely_with_retries(
240                    file_handle,
241                    format.processing,
242                    filetype,
243                    format.strategy,
244                    remote_file,
245                )
246            except Exception as e:
247                # If a parser error happens during remotely processing the file, this means the file is corrupted. This case is handled by the parse_records method, so just rethrow.
248                #
249                # For other exceptions, re-throw as config error so the sync is stopped as problems with the external API need to be resolved by the user and are not considered part of the SLA.
250                # Once this parser leaves experimental stage, we should consider making this a system error instead for issues that might be transient.
251                if isinstance(e, RecordParseError):
252                    raise e
253                raise AirbyteTracedException.from_exception(
254                    e, failure_type=FailureType.config_error
255                )
256
257            return result
258
259    def _params_to_dict(
260        self, params: Optional[List[APIParameterConfigModel]], strategy: str
261    ) -> Dict[str, Union[str, List[str]]]:
262        result_dict: Dict[str, Union[str, List[str]]] = {"strategy": strategy}
263        if params is None:
264            return result_dict
265        for item in params:
266            key = item.name
267            value = item.value
268            if key in result_dict:
269                existing_value = result_dict[key]
270                # If the key already exists, append the new value to its list
271                if isinstance(existing_value, list):
272                    existing_value.append(value)
273                else:
274                    result_dict[key] = [existing_value, value]
275            else:
276                # If the key doesn't exist, add it to the dictionary
277                result_dict[key] = value
278
279        return result_dict
280
281    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
282        """
283        Perform a connection check for the parser config:
284        - Verify that encryption is enabled if the API is hosted on a cloud instance.
285        - Verify that the API can extract text from a file.
286
287        For local processing, we don't need to perform any additional checks, implicit pydantic validation is enough.
288        """
289        format_config = _extract_format(config)
290        if isinstance(format_config.processing, LocalProcessingConfigModel):
291            if format_config.strategy == "hi_res":
292                return False, "Hi-res strategy is not supported for local processing"
293            return True, None
294
295        if is_cloud_environment() and not format_config.processing.api_url.startswith("https://"):
296            return False, "Base URL must start with https://"
297
298        try:
299            self._read_file_remotely(
300                BytesIO(b"# Airbyte source connection test"),
301                format_config.processing,
302                FileType.MD,
303                "auto",
304                RemoteFile(uri="test", last_modified=datetime.now()),
305            )
306        except Exception:
307            return False, "".join(traceback.format_exc())
308
309        return True, None
310
311    @backoff.on_exception(
312        backoff.expo, requests.exceptions.RequestException, max_tries=5, giveup=user_error
313    )
314    def _read_file_remotely_with_retries(
315        self,
316        file_handle: IOBase,
317        format: APIProcessingConfigModel,
318        filetype: FileType,
319        strategy: str,
320        remote_file: RemoteFile,
321    ) -> str:
322        """
323        Read a file remotely, retrying up to 5 times if the error is not caused by user error. This is useful for transient network errors or the API server being overloaded temporarily.
324        """
325        return self._read_file_remotely(file_handle, format, filetype, strategy, remote_file)
326
327    def _read_file_remotely(
328        self,
329        file_handle: IOBase,
330        format: APIProcessingConfigModel,
331        filetype: FileType,
332        strategy: str,
333        remote_file: RemoteFile,
334    ) -> str:
335        headers = {"accept": "application/json", "unstructured-api-key": format.api_key}
336
337        data = self._params_to_dict(format.parameters, strategy)
338
339        file_data = {"files": ("filename", file_handle, FILETYPE_TO_MIMETYPE[filetype])}
340
341        response = requests.post(
342            f"{format.api_url}/general/v0/general", headers=headers, data=data, files=file_data
343        )
344
345        if response.status_code == 422:
346            # 422 means the file couldn't be processed, but the API is working. Treat this as a parsing error (passing an error record to the destination).
347            raise self._create_parse_error(remote_file, response.json())
348        else:
349            # Other error statuses are raised as requests exceptions (retry everything except user errors)
350            response.raise_for_status()
351
352        json_response = response.json()
353
354        return self._render_markdown(json_response)
355
356    def _read_file_locally(
357        self, file_handle: IOBase, filetype: FileType, strategy: str, remote_file: RemoteFile
358    ) -> str:
359        _import_unstructured()
360        if (
361            (not unstructured_partition_pdf)
362            or (not unstructured_partition_docx)
363            or (not unstructured_partition_pptx)
364        ):
365            # check whether unstructured library is actually available for better error message and to ensure proper typing (can't be None after this point)
366            raise Exception("unstructured library is not available")
367
368        file: Any = file_handle
369
370        # before the parsing logic is entered, the file is read completely to make sure it is in local memory
371        file_handle.seek(0)
372        file_handle.read()
373        file_handle.seek(0)
374
375        try:
376            if filetype == FileType.PDF:
377                # for PDF, read the file into a BytesIO object because some code paths in pdf parsing are doing an instance check on the file object and don't work with file-like objects
378                file_handle.seek(0)
379                with BytesIO(file_handle.read()) as file:
380                    file_handle.seek(0)
381                    elements = unstructured_partition_pdf(file=file, strategy=strategy)
382            elif filetype == FileType.DOCX:
383                elements = unstructured_partition_docx(file=file)
384            elif filetype == FileType.PPTX:
385                elements = unstructured_partition_pptx(file=file)
386        except Exception as e:
387            raise self._create_parse_error(remote_file, str(e))
388
389        return self._render_markdown([element.to_dict() for element in elements])
390
391    def _create_parse_error(
392        self,
393        remote_file: RemoteFile,
394        message: str,
395    ) -> RecordParseError:
396        return RecordParseError(
397            FileBasedSourceError.ERROR_PARSING_RECORD, filename=remote_file.uri, message=message
398        )
399
400    def _get_filetype(self, file: IOBase, remote_file: RemoteFile) -> Optional[FileType]:
401        """
402        Detect the file type based on the file name and the file content.
403
404        There are three strategies to determine the file type:
405        1. Use the mime type if available (only some sources support it)
406        2. Use the file name if available
407        3. Use the file content
408        """
409        if remote_file.mime_type and remote_file.mime_type in STR_TO_FILETYPE:
410            return STR_TO_FILETYPE[remote_file.mime_type]
411
412        # set name to none, otherwise unstructured will try to get the modified date from the local file system
413        if hasattr(file, "name"):
414            file.name = None
415
416        # detect_filetype is either using the file name or file content
417        # if possible, try to leverage the file name to detect the file type
418        # if the file name is not available, use the file content
419        file_type: FileType | None = None
420        try:
421            file_type = detect_filetype(
422                filename=remote_file.uri,
423            )
424        except Exception:
425            # Path doesn't exist locally. Try something else...
426            pass
427
428        if file_type and file_type != FileType.UNK:
429            return file_type
430
431        type_based_on_content = detect_filetype(file=file)
432        file.seek(0)  # detect_filetype is reading to read the file content, so we need to reset
433
434        if type_based_on_content and type_based_on_content != FileType.UNK:
435            return type_based_on_content
436
437        extension = "." + remote_file.uri.split(".")[-1].lower()
438        if extension in EXT_TO_FILETYPE:
439            return EXT_TO_FILETYPE[extension]
440
441        return None
442
443    def _supported_file_types(self) -> List[Any]:
444        return [FileType.MD, FileType.PDF, FileType.DOCX, FileType.PPTX, FileType.TXT]
445
446    def _get_file_type_error_message(
447        self,
448        file_type: FileType | None,
449    ) -> str:
450        supported_file_types = ", ".join([str(type) for type in self._supported_file_types()])
451        return f"File type {file_type or 'None'!s} is not supported. Supported file types are {supported_file_types}"
452
453    def _render_markdown(self, elements: List[Any]) -> str:
454        return "\n\n".join((self._convert_to_markdown(el) for el in elements))
455
456    def _convert_to_markdown(self, el: Dict[str, Any]) -> str:
457        if dpath.get(el, "type") == "Title":
458            category_depth = dpath.get(el, "metadata/category_depth", default=1) or 1
459            if not isinstance(category_depth, int):
460                category_depth = (
461                    int(category_depth) if isinstance(category_depth, (str, float)) else 1
462                )
463            heading_str = "#" * category_depth
464            return f"{heading_str} {dpath.get(el, 'text')}"
465        elif dpath.get(el, "type") == "ListItem":
466            return f"- {dpath.get(el, 'text')}"
467        elif dpath.get(el, "type") == "Formula":
468            return f"```\n{dpath.get(el, 'text')}\n```"
469        else:
470            return str(dpath.get(el, "text", default=""))
471
472    @property
473    def file_read_mode(self) -> FileReadMode:
474        return FileReadMode.READ_BINARY

An abstract class containing methods that must be implemented for each supported file type.

parser_max_n_files_for_schema_inference: Optional[int]
113    @property
114    def parser_max_n_files_for_schema_inference(self) -> Optional[int]:
115        """
116        Just check one file as the schema is static
117        """
118        return 1

Just check one file as the schema is static

parser_max_n_files_for_parsability: Optional[int]
120    @property
121    def parser_max_n_files_for_parsability(self) -> Optional[int]:
122        """
123        Do not check any files for parsability because it might be an expensive operation and doesn't give much confidence whether the sync will succeed.
124        """
125        return 0

Do not check any files for parsability because it might be an expensive operation and doesn't give much confidence whether the sync will succeed.

def get_parser_defined_primary_key( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig) -> Optional[str]:
127    def get_parser_defined_primary_key(self, config: FileBasedStreamConfig) -> Optional[str]:
128        """
129        Return the document_key field as the primary key.
130
131        his will pre-select the document key column as the primary key when setting up a connection, making it easier for the user to configure normalization in the destination.
132        """
133        return "document_key"

Return the document_key field as the primary key.

his will pre-select the document key column as the primary key when setting up a connection, making it easier for the user to configure normalization in the destination.

async def infer_schema( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger) -> Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]:
135    async def infer_schema(
136        self,
137        config: FileBasedStreamConfig,
138        file: RemoteFile,
139        stream_reader: AbstractFileBasedStreamReader,
140        logger: logging.Logger,
141    ) -> SchemaType:
142        format = _extract_format(config)
143        with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle:
144            filetype = self._get_filetype(file_handle, file)
145            if filetype not in self._supported_file_types() and not format.skip_unprocessable_files:
146                raise self._create_parse_error(
147                    file,
148                    self._get_file_type_error_message(filetype),
149                )
150
151            return {
152                "content": {
153                    "type": "string",
154                    "description": "Content of the file as markdown. Might be null if the file could not be parsed",
155                },
156                "document_key": {
157                    "type": "string",
158                    "description": "Unique identifier of the document, e.g. the file path",
159                },
160                "_ab_source_file_parse_error": {
161                    "type": "string",
162                    "description": "Error message if the file could not be parsed even though the file is supported",
163                },
164            }

Infer the JSON Schema for this file.

def parse_records( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger, discovered_schema: Optional[Mapping[str, Mapping[str, Mapping[str, Union[List[str], Literal['string'], str]]]]]) -> Iterable[Dict[str, Any]]:
166    def parse_records(
167        self,
168        config: FileBasedStreamConfig,
169        file: RemoteFile,
170        stream_reader: AbstractFileBasedStreamReader,
171        logger: logging.Logger,
172        discovered_schema: Optional[Mapping[str, SchemaType]],
173    ) -> Iterable[Dict[str, Any]]:
174        format = _extract_format(config)
175        with stream_reader.open_file(file, self.file_read_mode, None, logger) as file_handle:
176            try:
177                markdown = self._read_file(file_handle, file, format, logger)
178                yield {
179                    "content": markdown,
180                    "document_key": file.uri,
181                    "_ab_source_file_parse_error": None,
182                }
183            except RecordParseError as e:
184                # RecordParseError is raised when the file can't be parsed because of a problem with the file content (either the file is not supported or the file is corrupted)
185                # if the skip_unprocessable_files flag is set, we log a warning and pass the error as part of the document
186                # otherwise, we raise the error to fail the sync
187                if format.skip_unprocessable_files:
188                    exception_str = str(e)
189                    logger.warn(f"File {file.uri} caused an error during parsing: {exception_str}.")
190                    yield {
191                        "content": None,
192                        "document_key": file.uri,
193                        "_ab_source_file_parse_error": exception_str,
194                    }
195                    logger.warn(f"File {file.uri} cannot be parsed. Skipping it.")
196                else:
197                    raise e
198            except Exception as e:
199                exception_str = str(e)
200                logger.error(f"File {file.uri} caused an error during parsing: {exception_str}.")
201                raise e

Parse and emit each record.

def check_config( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
281    def check_config(self, config: FileBasedStreamConfig) -> Tuple[bool, Optional[str]]:
282        """
283        Perform a connection check for the parser config:
284        - Verify that encryption is enabled if the API is hosted on a cloud instance.
285        - Verify that the API can extract text from a file.
286
287        For local processing, we don't need to perform any additional checks, implicit pydantic validation is enough.
288        """
289        format_config = _extract_format(config)
290        if isinstance(format_config.processing, LocalProcessingConfigModel):
291            if format_config.strategy == "hi_res":
292                return False, "Hi-res strategy is not supported for local processing"
293            return True, None
294
295        if is_cloud_environment() and not format_config.processing.api_url.startswith("https://"):
296            return False, "Base URL must start with https://"
297
298        try:
299            self._read_file_remotely(
300                BytesIO(b"# Airbyte source connection test"),
301                format_config.processing,
302                FileType.MD,
303                "auto",
304                RemoteFile(uri="test", last_modified=datetime.now()),
305            )
306        except Exception:
307            return False, "".join(traceback.format_exc())
308
309        return True, None

Perform a connection check for the parser config:

  • Verify that encryption is enabled if the API is hosted on a cloud instance.
  • Verify that the API can extract text from a file.

For local processing, we don't need to perform any additional checks, implicit pydantic validation is enough.

file_read_mode: airbyte_cdk.sources.file_based.FileReadMode
472    @property
473    def file_read_mode(self) -> FileReadMode:
474        return FileReadMode.READ_BINARY

The mode in which the file should be opened for reading.

class FileTransfer:
17class FileTransfer:
18    def __init__(self) -> None:
19        self._local_directory = (
20            AIRBYTE_STAGING_DIRECTORY
21            if os.path.exists(AIRBYTE_STAGING_DIRECTORY)
22            else DEFAULT_LOCAL_DIRECTORY
23        )
24
25    def get_file(
26        self,
27        config: FileBasedStreamConfig,
28        file: RemoteFile,
29        stream_reader: AbstractFileBasedStreamReader,
30        logger: logging.Logger,
31    ) -> Iterable[Dict[str, Any]]:
32        try:
33            yield stream_reader.get_file(
34                file=file, local_directory=self._local_directory, logger=logger
35            )
36        except Exception as ex:
37            logger.error("An error has occurred while getting file: %s", str(ex))
38            raise ex
def get_file( self, config: airbyte_cdk.sources.file_based.FileBasedStreamConfig, file: airbyte_cdk.sources.file_based.RemoteFile, stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader, logger: logging.Logger) -> Iterable[Dict[str, Any]]:
25    def get_file(
26        self,
27        config: FileBasedStreamConfig,
28        file: RemoteFile,
29        stream_reader: AbstractFileBasedStreamReader,
30        logger: logging.Logger,
31    ) -> Iterable[Dict[str, Any]]:
32        try:
33            yield stream_reader.get_file(
34                file=file, local_directory=self._local_directory, logger=logger
35            )
36        except Exception as ex:
37            logger.error("An error has occurred while getting file: %s", str(ex))
38            raise ex