airbyte_cdk.sources.file_based.exceptions
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from enum import Enum 6from typing import Any, List, Union 7 8from airbyte_cdk.models import AirbyteMessage, FailureType 9from airbyte_cdk.utils import AirbyteTracedException 10 11 12class FileBasedSourceError(Enum): 13 EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict." 14 GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split." 15 ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding." 16 ERROR_CASTING_VALUE = "Could not cast the value to the expected type." 17 ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string." 18 ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record." 19 ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files." 20 ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files." 21 ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable." 22 ERROR_PARSING_USER_PROVIDED_SCHEMA = ( 23 "The provided schema could not be transformed into valid JSON Schema." 24 ) 25 ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy." 26 ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time." 27 ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = ( 28 "CSV data row contains more columns than the header row defines." 29 ) 30 ERROR_PARSING_RECORD_MISMATCHED_ROWS = ( 31 "CSV data row contains fewer columns than the header row defines." 32 ) 33 STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema." 34 NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key." 35 UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type." 36 SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?" 37 INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns." 38 CONFIG_VALIDATION_ERROR = "Error creating stream config object." 39 MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing." 40 UNDEFINED_PARSER = "No parser is defined for this file type." 41 UNDEFINED_VALIDATION_POLICY = ( 42 "The validation policy defined in the config does not exist for the source." 43 ) 44 45 46class FileBasedErrorsCollector: 47 """ 48 The placeholder for all errors collected. 49 """ 50 51 errors: List[AirbyteMessage] = [] 52 53 def yield_and_raise_collected(self) -> Any: 54 if self.errors: 55 # emit collected logged messages 56 yield from self.errors 57 # clean the collector 58 self.errors.clear() 59 # raising the single exception 60 raise AirbyteTracedException( 61 internal_message="Please check the logged errors for more information.", 62 message="Some errors occured while reading from the source.", 63 failure_type=FailureType.config_error, 64 ) 65 66 def collect(self, logged_error: AirbyteMessage) -> None: 67 self.errors.append(logged_error) 68 69 70class BaseFileBasedSourceError(Exception): 71 def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa 72 if isinstance(error, FileBasedSourceError): 73 error = FileBasedSourceError(error).value 74 super().__init__( 75 f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}" 76 ) 77 78 79class ConfigValidationError(BaseFileBasedSourceError): 80 pass 81 82 83class InvalidSchemaError(BaseFileBasedSourceError): 84 pass 85 86 87class MissingSchemaError(BaseFileBasedSourceError): 88 pass 89 90 91class NoFilesMatchingError(BaseFileBasedSourceError): 92 pass 93 94 95class RecordParseError(BaseFileBasedSourceError): 96 pass 97 98 99class ExcelCalamineParsingError(BaseFileBasedSourceError): 100 """Raised when Calamine engine fails to parse an Excel file.""" 101 102 pass 103 104 105class SchemaInferenceError(BaseFileBasedSourceError): 106 pass 107 108 109class CheckAvailabilityError(BaseFileBasedSourceError): 110 pass 111 112 113class UndefinedParserError(BaseFileBasedSourceError): 114 pass 115 116 117class StopSyncPerValidationPolicy(BaseFileBasedSourceError): 118 pass 119 120 121class ErrorListingFiles(BaseFileBasedSourceError): 122 pass 123 124 125class DuplicatedFilesError(BaseFileBasedSourceError): 126 def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any): 127 self._duplicated_files_names = duplicated_files_names 128 self._stream_name: str = kwargs["stream"] 129 super().__init__(self._format_duplicate_files_error_message(), **kwargs) 130 131 def _format_duplicate_files_error_message(self) -> str: 132 duplicated_files_messages = [] 133 for duplicated_file in self._duplicated_files_names: 134 for duplicated_file_name, file_paths in duplicated_file.items(): 135 file_duplicated_message = ( 136 f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n" 137 + "".join(f"\n - {file_paths}") 138 ) 139 duplicated_files_messages.append(file_duplicated_message) 140 141 error_message = ( 142 f"ERROR: Duplicate filenames found for stream {self._stream_name}. " 143 "Duplicate file names are not allowed if the Preserve Sub-Directories in File Paths option is disabled. " 144 "Please remove or rename the duplicate files before attempting to re-run the sync.\n\n" 145 + "\n".join(duplicated_files_messages) 146 ) 147 148 return error_message 149 150 def __repr__(self) -> str: 151 """Return a string representation of the exception.""" 152 class_name = self.__class__.__name__ 153 properties_str = ", ".join( 154 f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_") 155 ) 156 return f"{class_name}({properties_str})" 157 158 159class CustomFileBasedException(AirbyteTracedException): 160 """ 161 A specialized exception for file-based connectors. 162 163 This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages. 164 """ 165 166 pass 167 168 169class FileSizeLimitError(CustomFileBasedException): 170 pass 171 172 173class EmptyFileSchemaInferenceError(AirbyteTracedException): 174 pass
13class FileBasedSourceError(Enum): 14 EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict." 15 GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split." 16 ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding." 17 ERROR_CASTING_VALUE = "Could not cast the value to the expected type." 18 ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string." 19 ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record." 20 ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files." 21 ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files." 22 ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable." 23 ERROR_PARSING_USER_PROVIDED_SCHEMA = ( 24 "The provided schema could not be transformed into valid JSON Schema." 25 ) 26 ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy." 27 ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time." 28 ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = ( 29 "CSV data row contains more columns than the header row defines." 30 ) 31 ERROR_PARSING_RECORD_MISMATCHED_ROWS = ( 32 "CSV data row contains fewer columns than the header row defines." 33 ) 34 STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema." 35 NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key." 36 UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type." 37 SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?" 38 INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns." 39 CONFIG_VALIDATION_ERROR = "Error creating stream config object." 40 MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing." 41 UNDEFINED_PARSER = "No parser is defined for this file type." 42 UNDEFINED_VALIDATION_POLICY = ( 43 "The validation policy defined in the config does not exist for the source." 44 )
An enumeration.
47class FileBasedErrorsCollector: 48 """ 49 The placeholder for all errors collected. 50 """ 51 52 errors: List[AirbyteMessage] = [] 53 54 def yield_and_raise_collected(self) -> Any: 55 if self.errors: 56 # emit collected logged messages 57 yield from self.errors 58 # clean the collector 59 self.errors.clear() 60 # raising the single exception 61 raise AirbyteTracedException( 62 internal_message="Please check the logged errors for more information.", 63 message="Some errors occured while reading from the source.", 64 failure_type=FailureType.config_error, 65 ) 66 67 def collect(self, logged_error: AirbyteMessage) -> None: 68 self.errors.append(logged_error)
The placeholder for all errors collected.
54 def yield_and_raise_collected(self) -> Any: 55 if self.errors: 56 # emit collected logged messages 57 yield from self.errors 58 # clean the collector 59 self.errors.clear() 60 # raising the single exception 61 raise AirbyteTracedException( 62 internal_message="Please check the logged errors for more information.", 63 message="Some errors occured while reading from the source.", 64 failure_type=FailureType.config_error, 65 )
71class BaseFileBasedSourceError(Exception): 72 def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa 73 if isinstance(error, FileBasedSourceError): 74 error = FileBasedSourceError(error).value 75 super().__init__( 76 f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}" 77 )
Common base class for all non-exit exceptions.
72 def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa 73 if isinstance(error, FileBasedSourceError): 74 error = FileBasedSourceError(error).value 75 super().__init__( 76 f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}" 77 )
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
100class ExcelCalamineParsingError(BaseFileBasedSourceError): 101 """Raised when Calamine engine fails to parse an Excel file.""" 102 103 pass
Raised when Calamine engine fails to parse an Excel file.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
126class DuplicatedFilesError(BaseFileBasedSourceError): 127 def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any): 128 self._duplicated_files_names = duplicated_files_names 129 self._stream_name: str = kwargs["stream"] 130 super().__init__(self._format_duplicate_files_error_message(), **kwargs) 131 132 def _format_duplicate_files_error_message(self) -> str: 133 duplicated_files_messages = [] 134 for duplicated_file in self._duplicated_files_names: 135 for duplicated_file_name, file_paths in duplicated_file.items(): 136 file_duplicated_message = ( 137 f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n" 138 + "".join(f"\n - {file_paths}") 139 ) 140 duplicated_files_messages.append(file_duplicated_message) 141 142 error_message = ( 143 f"ERROR: Duplicate filenames found for stream {self._stream_name}. " 144 "Duplicate file names are not allowed if the Preserve Sub-Directories in File Paths option is disabled. " 145 "Please remove or rename the duplicate files before attempting to re-run the sync.\n\n" 146 + "\n".join(duplicated_files_messages) 147 ) 148 149 return error_message 150 151 def __repr__(self) -> str: 152 """Return a string representation of the exception.""" 153 class_name = self.__class__.__name__ 154 properties_str = ", ".join( 155 f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_") 156 ) 157 return f"{class_name}({properties_str})"
Common base class for all non-exit exceptions.
160class CustomFileBasedException(AirbyteTracedException): 161 """ 162 A specialized exception for file-based connectors. 163 164 This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages. 165 """ 166 167 pass
A specialized exception for file-based connectors.
This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
A specialized exception for file-based connectors.
This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
An exception that should be emitted as an AirbyteTraceMessage