airbyte_cdk.sources.file_based.exceptions

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from enum import Enum
  6from typing import Any, List, Union
  7
  8from airbyte_cdk.models import AirbyteMessage, FailureType
  9from airbyte_cdk.utils import AirbyteTracedException
 10
 11
 12class FileBasedSourceError(Enum):
 13    EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict."
 14    GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split."
 15    ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding."
 16    ERROR_CASTING_VALUE = "Could not cast the value to the expected type."
 17    ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string."
 18    ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record."
 19    ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files."
 20    ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files."
 21    ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable."
 22    ERROR_PARSING_USER_PROVIDED_SCHEMA = (
 23        "The provided schema could not be transformed into valid JSON Schema."
 24    )
 25    ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
 26    ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time."
 27    ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = (
 28        "CSV data row contains more columns than the header row defines."
 29    )
 30    ERROR_PARSING_RECORD_MISMATCHED_ROWS = (
 31        "CSV data row contains fewer columns than the header row defines."
 32    )
 33    STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."
 34    NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key."
 35    UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type."
 36    SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?"
 37    INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns."
 38    CONFIG_VALIDATION_ERROR = "Error creating stream config object."
 39    MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing."
 40    UNDEFINED_PARSER = "No parser is defined for this file type."
 41    UNDEFINED_VALIDATION_POLICY = (
 42        "The validation policy defined in the config does not exist for the source."
 43    )
 44
 45
 46class FileBasedErrorsCollector:
 47    """
 48    The placeholder for all errors collected.
 49    """
 50
 51    errors: List[AirbyteMessage] = []
 52
 53    def yield_and_raise_collected(self) -> Any:
 54        if self.errors:
 55            # emit collected logged messages
 56            yield from self.errors
 57            # clean the collector
 58            self.errors.clear()
 59            # raising the single exception
 60            raise AirbyteTracedException(
 61                internal_message="Please check the logged errors for more information.",
 62                message="Some errors occured while reading from the source.",
 63                failure_type=FailureType.config_error,
 64            )
 65
 66    def collect(self, logged_error: AirbyteMessage) -> None:
 67        self.errors.append(logged_error)
 68
 69
 70class BaseFileBasedSourceError(Exception):
 71    def __init__(self, error: Union[FileBasedSourceError, str], **kwargs):  # type: ignore # noqa
 72        if isinstance(error, FileBasedSourceError):
 73            error = FileBasedSourceError(error).value
 74        super().__init__(
 75            f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}"
 76        )
 77
 78
 79class ConfigValidationError(BaseFileBasedSourceError):
 80    pass
 81
 82
 83class InvalidSchemaError(BaseFileBasedSourceError):
 84    pass
 85
 86
 87class MissingSchemaError(BaseFileBasedSourceError):
 88    pass
 89
 90
 91class NoFilesMatchingError(BaseFileBasedSourceError):
 92    pass
 93
 94
 95class RecordParseError(BaseFileBasedSourceError):
 96    pass
 97
 98
 99class ExcelCalamineParsingError(BaseFileBasedSourceError):
100    """Raised when Calamine engine fails to parse an Excel file."""
101
102    pass
103
104
105class SchemaInferenceError(BaseFileBasedSourceError):
106    pass
107
108
109class CheckAvailabilityError(BaseFileBasedSourceError):
110    pass
111
112
113class UndefinedParserError(BaseFileBasedSourceError):
114    pass
115
116
117class StopSyncPerValidationPolicy(BaseFileBasedSourceError):
118    pass
119
120
121class ErrorListingFiles(BaseFileBasedSourceError):
122    pass
123
124
125class DuplicatedFilesError(BaseFileBasedSourceError):
126    def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any):
127        self._duplicated_files_names = duplicated_files_names
128        self._stream_name: str = kwargs["stream"]
129        super().__init__(self._format_duplicate_files_error_message(), **kwargs)
130
131    def _format_duplicate_files_error_message(self) -> str:
132        duplicated_files_messages = []
133        for duplicated_file in self._duplicated_files_names:
134            for duplicated_file_name, file_paths in duplicated_file.items():
135                file_duplicated_message = (
136                    f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n"
137                    + "".join(f"\n - {file_paths}")
138                )
139                duplicated_files_messages.append(file_duplicated_message)
140
141        error_message = (
142            f"ERROR: Duplicate filenames found for stream {self._stream_name}. "
143            "Duplicate file names are not allowed if the Preserve Sub-Directories in File Paths option is disabled. "
144            "Please remove or rename the duplicate files before attempting to re-run the sync.\n\n"
145            + "\n".join(duplicated_files_messages)
146        )
147
148        return error_message
149
150    def __repr__(self) -> str:
151        """Return a string representation of the exception."""
152        class_name = self.__class__.__name__
153        properties_str = ", ".join(
154            f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_")
155        )
156        return f"{class_name}({properties_str})"
157
158
159class CustomFileBasedException(AirbyteTracedException):
160    """
161    A specialized exception for file-based connectors.
162
163    This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
164    """
165
166    pass
167
168
169class FileSizeLimitError(CustomFileBasedException):
170    pass
171
172
173class EmptyFileSchemaInferenceError(AirbyteTracedException):
174    pass
class FileBasedSourceError(enum.Enum):
13class FileBasedSourceError(Enum):
14    EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict."
15    GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split."
16    ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding."
17    ERROR_CASTING_VALUE = "Could not cast the value to the expected type."
18    ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string."
19    ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record."
20    ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files."
21    ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files."
22    ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable."
23    ERROR_PARSING_USER_PROVIDED_SCHEMA = (
24        "The provided schema could not be transformed into valid JSON Schema."
25    )
26    ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
27    ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time."
28    ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = (
29        "CSV data row contains more columns than the header row defines."
30    )
31    ERROR_PARSING_RECORD_MISMATCHED_ROWS = (
32        "CSV data row contains fewer columns than the header row defines."
33    )
34    STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."
35    NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key."
36    UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type."
37    SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?"
38    INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns."
39    CONFIG_VALIDATION_ERROR = "Error creating stream config object."
40    MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing."
41    UNDEFINED_PARSER = "No parser is defined for this file type."
42    UNDEFINED_VALIDATION_POLICY = (
43        "The validation policy defined in the config does not exist for the source."
44    )

An enumeration.

EMPTY_STREAM = <FileBasedSourceError.EMPTY_STREAM: 'No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict.'>
GLOB_PARSE_ERROR = <FileBasedSourceError.GLOB_PARSE_ERROR: 'Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split.'>
ENCODING_ERROR = <FileBasedSourceError.ENCODING_ERROR: 'File encoding error. The configured encoding must match file encoding.'>
ERROR_CASTING_VALUE = <FileBasedSourceError.ERROR_CASTING_VALUE: 'Could not cast the value to the expected type.'>
ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = <FileBasedSourceError.ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE: 'Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string.'>
ERROR_DECODING_VALUE = <FileBasedSourceError.ERROR_DECODING_VALUE: 'Expected a JSON-decodeable value but could not decode record.'>
ERROR_LISTING_FILES = <FileBasedSourceError.ERROR_LISTING_FILES: 'Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files.'>
ERROR_READING_FILE = <FileBasedSourceError.ERROR_READING_FILE: 'Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files.'>
ERROR_PARSING_RECORD = <FileBasedSourceError.ERROR_PARSING_RECORD: "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable.">
ERROR_PARSING_USER_PROVIDED_SCHEMA = <FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA: 'The provided schema could not be transformed into valid JSON Schema.'>
ERROR_VALIDATING_RECORD = <FileBasedSourceError.ERROR_VALIDATING_RECORD: 'One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy.'>
ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = <FileBasedSourceError.ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS: "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time.">
ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = <FileBasedSourceError.ERROR_PARSING_RECORD_MISMATCHED_COLUMNS: 'CSV data row contains more columns than the header row defines.'>
ERROR_PARSING_RECORD_MISMATCHED_ROWS = <FileBasedSourceError.ERROR_PARSING_RECORD_MISMATCHED_ROWS: 'CSV data row contains fewer columns than the header row defines.'>
STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = <FileBasedSourceError.STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY: 'Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema.'>
NULL_VALUE_IN_SCHEMA = <FileBasedSourceError.NULL_VALUE_IN_SCHEMA: 'Error during schema inference: no type was detected for key.'>
UNRECOGNIZED_TYPE = <FileBasedSourceError.UNRECOGNIZED_TYPE: 'Error during schema inference: unrecognized type.'>
SCHEMA_INFERENCE_ERROR = <FileBasedSourceError.SCHEMA_INFERENCE_ERROR: 'Error inferring schema from files. Are the files valid?'>
INVALID_SCHEMA_ERROR = <FileBasedSourceError.INVALID_SCHEMA_ERROR: "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns.">
CONFIG_VALIDATION_ERROR = <FileBasedSourceError.CONFIG_VALIDATION_ERROR: 'Error creating stream config object.'>
MISSING_SCHEMA = <FileBasedSourceError.MISSING_SCHEMA: 'Expected `json_schema` in the configured catalog but it is missing.'>
UNDEFINED_PARSER = <FileBasedSourceError.UNDEFINED_PARSER: 'No parser is defined for this file type.'>
UNDEFINED_VALIDATION_POLICY = <FileBasedSourceError.UNDEFINED_VALIDATION_POLICY: 'The validation policy defined in the config does not exist for the source.'>
class FileBasedErrorsCollector:
47class FileBasedErrorsCollector:
48    """
49    The placeholder for all errors collected.
50    """
51
52    errors: List[AirbyteMessage] = []
53
54    def yield_and_raise_collected(self) -> Any:
55        if self.errors:
56            # emit collected logged messages
57            yield from self.errors
58            # clean the collector
59            self.errors.clear()
60            # raising the single exception
61            raise AirbyteTracedException(
62                internal_message="Please check the logged errors for more information.",
63                message="Some errors occured while reading from the source.",
64                failure_type=FailureType.config_error,
65            )
66
67    def collect(self, logged_error: AirbyteMessage) -> None:
68        self.errors.append(logged_error)

The placeholder for all errors collected.

errors: List[airbyte_cdk.AirbyteMessage] = []
def yield_and_raise_collected(self) -> Any:
54    def yield_and_raise_collected(self) -> Any:
55        if self.errors:
56            # emit collected logged messages
57            yield from self.errors
58            # clean the collector
59            self.errors.clear()
60            # raising the single exception
61            raise AirbyteTracedException(
62                internal_message="Please check the logged errors for more information.",
63                message="Some errors occured while reading from the source.",
64                failure_type=FailureType.config_error,
65            )
def collect( self, logged_error: airbyte_cdk.AirbyteMessage) -> None:
67    def collect(self, logged_error: AirbyteMessage) -> None:
68        self.errors.append(logged_error)
class BaseFileBasedSourceError(builtins.Exception):
71class BaseFileBasedSourceError(Exception):
72    def __init__(self, error: Union[FileBasedSourceError, str], **kwargs):  # type: ignore # noqa
73        if isinstance(error, FileBasedSourceError):
74            error = FileBasedSourceError(error).value
75        super().__init__(
76            f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}"
77        )

Common base class for all non-exit exceptions.

BaseFileBasedSourceError( error: Union[FileBasedSourceError, str], **kwargs)
72    def __init__(self, error: Union[FileBasedSourceError, str], **kwargs):  # type: ignore # noqa
73        if isinstance(error, FileBasedSourceError):
74            error = FileBasedSourceError(error).value
75        super().__init__(
76            f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}"
77        )
class ConfigValidationError(BaseFileBasedSourceError):
80class ConfigValidationError(BaseFileBasedSourceError):
81    pass

Common base class for all non-exit exceptions.

class InvalidSchemaError(BaseFileBasedSourceError):
84class InvalidSchemaError(BaseFileBasedSourceError):
85    pass

Common base class for all non-exit exceptions.

class MissingSchemaError(BaseFileBasedSourceError):
88class MissingSchemaError(BaseFileBasedSourceError):
89    pass

Common base class for all non-exit exceptions.

class NoFilesMatchingError(BaseFileBasedSourceError):
92class NoFilesMatchingError(BaseFileBasedSourceError):
93    pass

Common base class for all non-exit exceptions.

class RecordParseError(BaseFileBasedSourceError):
96class RecordParseError(BaseFileBasedSourceError):
97    pass

Common base class for all non-exit exceptions.

class ExcelCalamineParsingError(BaseFileBasedSourceError):
100class ExcelCalamineParsingError(BaseFileBasedSourceError):
101    """Raised when Calamine engine fails to parse an Excel file."""
102
103    pass

Raised when Calamine engine fails to parse an Excel file.

class SchemaInferenceError(BaseFileBasedSourceError):
106class SchemaInferenceError(BaseFileBasedSourceError):
107    pass

Common base class for all non-exit exceptions.

class CheckAvailabilityError(BaseFileBasedSourceError):
110class CheckAvailabilityError(BaseFileBasedSourceError):
111    pass

Common base class for all non-exit exceptions.

class UndefinedParserError(BaseFileBasedSourceError):
114class UndefinedParserError(BaseFileBasedSourceError):
115    pass

Common base class for all non-exit exceptions.

class StopSyncPerValidationPolicy(BaseFileBasedSourceError):
118class StopSyncPerValidationPolicy(BaseFileBasedSourceError):
119    pass

Common base class for all non-exit exceptions.

class ErrorListingFiles(BaseFileBasedSourceError):
122class ErrorListingFiles(BaseFileBasedSourceError):
123    pass

Common base class for all non-exit exceptions.

class DuplicatedFilesError(BaseFileBasedSourceError):
126class DuplicatedFilesError(BaseFileBasedSourceError):
127    def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any):
128        self._duplicated_files_names = duplicated_files_names
129        self._stream_name: str = kwargs["stream"]
130        super().__init__(self._format_duplicate_files_error_message(), **kwargs)
131
132    def _format_duplicate_files_error_message(self) -> str:
133        duplicated_files_messages = []
134        for duplicated_file in self._duplicated_files_names:
135            for duplicated_file_name, file_paths in duplicated_file.items():
136                file_duplicated_message = (
137                    f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n"
138                    + "".join(f"\n - {file_paths}")
139                )
140                duplicated_files_messages.append(file_duplicated_message)
141
142        error_message = (
143            f"ERROR: Duplicate filenames found for stream {self._stream_name}. "
144            "Duplicate file names are not allowed if the Preserve Sub-Directories in File Paths option is disabled. "
145            "Please remove or rename the duplicate files before attempting to re-run the sync.\n\n"
146            + "\n".join(duplicated_files_messages)
147        )
148
149        return error_message
150
151    def __repr__(self) -> str:
152        """Return a string representation of the exception."""
153        class_name = self.__class__.__name__
154        properties_str = ", ".join(
155            f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_")
156        )
157        return f"{class_name}({properties_str})"

Common base class for all non-exit exceptions.

DuplicatedFilesError(duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any)
127    def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any):
128        self._duplicated_files_names = duplicated_files_names
129        self._stream_name: str = kwargs["stream"]
130        super().__init__(self._format_duplicate_files_error_message(), **kwargs)
class CustomFileBasedException(airbyte_cdk.utils.traced_exception.AirbyteTracedException):
160class CustomFileBasedException(AirbyteTracedException):
161    """
162    A specialized exception for file-based connectors.
163
164    This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
165    """
166
167    pass

A specialized exception for file-based connectors.

This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.

class FileSizeLimitError(CustomFileBasedException):
170class FileSizeLimitError(CustomFileBasedException):
171    pass

A specialized exception for file-based connectors.

This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.

class EmptyFileSchemaInferenceError(airbyte_cdk.utils.traced_exception.AirbyteTracedException):
174class EmptyFileSchemaInferenceError(AirbyteTracedException):
175    pass

An exception that should be emitted as an AirbyteTraceMessage