airbyte_cdk.sources.file_based.exceptions
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5from enum import Enum 6from typing import Any, List, Union 7 8from airbyte_cdk.models import AirbyteMessage, FailureType 9from airbyte_cdk.utils import AirbyteTracedException 10 11 12class FileBasedSourceError(Enum): 13 EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict." 14 GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split." 15 ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding." 16 ERROR_CASTING_VALUE = "Could not cast the value to the expected type." 17 ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string." 18 ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record." 19 ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files." 20 ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files." 21 ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable." 22 ERROR_PARSING_USER_PROVIDED_SCHEMA = ( 23 "The provided schema could not be transformed into valid JSON Schema." 24 ) 25 ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy." 26 ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time." 27 ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." 28 ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." 29 STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema." 30 NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key." 31 UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type." 32 SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?" 33 INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns." 34 CONFIG_VALIDATION_ERROR = "Error creating stream config object." 35 MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing." 36 UNDEFINED_PARSER = "No parser is defined for this file type." 37 UNDEFINED_VALIDATION_POLICY = ( 38 "The validation policy defined in the config does not exist for the source." 39 ) 40 41 42class FileBasedErrorsCollector: 43 """ 44 The placeholder for all errors collected. 45 """ 46 47 errors: List[AirbyteMessage] = [] 48 49 def yield_and_raise_collected(self) -> Any: 50 if self.errors: 51 # emit collected logged messages 52 yield from self.errors 53 # clean the collector 54 self.errors.clear() 55 # raising the single exception 56 raise AirbyteTracedException( 57 internal_message="Please check the logged errors for more information.", 58 message="Some errors occured while reading from the source.", 59 failure_type=FailureType.config_error, 60 ) 61 62 def collect(self, logged_error: AirbyteMessage) -> None: 63 self.errors.append(logged_error) 64 65 66class BaseFileBasedSourceError(Exception): 67 def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa 68 if isinstance(error, FileBasedSourceError): 69 error = FileBasedSourceError(error).value 70 super().__init__( 71 f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}" 72 ) 73 74 75class ConfigValidationError(BaseFileBasedSourceError): 76 pass 77 78 79class InvalidSchemaError(BaseFileBasedSourceError): 80 pass 81 82 83class MissingSchemaError(BaseFileBasedSourceError): 84 pass 85 86 87class NoFilesMatchingError(BaseFileBasedSourceError): 88 pass 89 90 91class RecordParseError(BaseFileBasedSourceError): 92 pass 93 94 95class SchemaInferenceError(BaseFileBasedSourceError): 96 pass 97 98 99class CheckAvailabilityError(BaseFileBasedSourceError): 100 pass 101 102 103class UndefinedParserError(BaseFileBasedSourceError): 104 pass 105 106 107class StopSyncPerValidationPolicy(BaseFileBasedSourceError): 108 pass 109 110 111class ErrorListingFiles(BaseFileBasedSourceError): 112 pass 113 114 115class DuplicatedFilesError(BaseFileBasedSourceError): 116 def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any): 117 self._duplicated_files_names = duplicated_files_names 118 self._stream_name: str = kwargs["stream"] 119 super().__init__(self._format_duplicate_files_error_message(), **kwargs) 120 121 def _format_duplicate_files_error_message(self) -> str: 122 duplicated_files_messages = [] 123 for duplicated_file in self._duplicated_files_names: 124 for duplicated_file_name, file_paths in duplicated_file.items(): 125 file_duplicated_message = ( 126 f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n" 127 + "".join(f"\n - {file_paths}") 128 ) 129 duplicated_files_messages.append(file_duplicated_message) 130 131 error_message = ( 132 f"ERROR: Duplicate filenames found for stream {self._stream_name}. " 133 "Duplicate file names are not allowed if the Preserve Sub-Directories in File Paths option is disabled. " 134 "Please remove or rename the duplicate files before attempting to re-run the sync.\n\n" 135 + "\n".join(duplicated_files_messages) 136 ) 137 138 return error_message 139 140 def __repr__(self) -> str: 141 """Return a string representation of the exception.""" 142 class_name = self.__class__.__name__ 143 properties_str = ", ".join( 144 f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_") 145 ) 146 return f"{class_name}({properties_str})" 147 148 149class CustomFileBasedException(AirbyteTracedException): 150 """ 151 A specialized exception for file-based connectors. 152 153 This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages. 154 """ 155 156 pass 157 158 159class FileSizeLimitError(CustomFileBasedException): 160 pass 161 162 163class EmptyFileSchemaInferenceError(AirbyteTracedException): 164 pass
13class FileBasedSourceError(Enum): 14 EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict." 15 GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split." 16 ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding." 17 ERROR_CASTING_VALUE = "Could not cast the value to the expected type." 18 ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string." 19 ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record." 20 ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files." 21 ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files." 22 ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable." 23 ERROR_PARSING_USER_PROVIDED_SCHEMA = ( 24 "The provided schema could not be transformed into valid JSON Schema." 25 ) 26 ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy." 27 ERROR_VALIDATION_STREAM_DISCOVERY_OPTIONS = "Only one of options 'Schemaless', 'Input Schema', 'Files To Read For Schema Discover' or 'Use First Found File For Schema Discover' can be provided at the same time." 28 ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." 29 ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows." 30 STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema." 31 NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key." 32 UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type." 33 SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?" 34 INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns." 35 CONFIG_VALIDATION_ERROR = "Error creating stream config object." 36 MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing." 37 UNDEFINED_PARSER = "No parser is defined for this file type." 38 UNDEFINED_VALIDATION_POLICY = ( 39 "The validation policy defined in the config does not exist for the source." 40 )
43class FileBasedErrorsCollector: 44 """ 45 The placeholder for all errors collected. 46 """ 47 48 errors: List[AirbyteMessage] = [] 49 50 def yield_and_raise_collected(self) -> Any: 51 if self.errors: 52 # emit collected logged messages 53 yield from self.errors 54 # clean the collector 55 self.errors.clear() 56 # raising the single exception 57 raise AirbyteTracedException( 58 internal_message="Please check the logged errors for more information.", 59 message="Some errors occured while reading from the source.", 60 failure_type=FailureType.config_error, 61 ) 62 63 def collect(self, logged_error: AirbyteMessage) -> None: 64 self.errors.append(logged_error)
The placeholder for all errors collected.
50 def yield_and_raise_collected(self) -> Any: 51 if self.errors: 52 # emit collected logged messages 53 yield from self.errors 54 # clean the collector 55 self.errors.clear() 56 # raising the single exception 57 raise AirbyteTracedException( 58 internal_message="Please check the logged errors for more information.", 59 message="Some errors occured while reading from the source.", 60 failure_type=FailureType.config_error, 61 )
67class BaseFileBasedSourceError(Exception): 68 def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa 69 if isinstance(error, FileBasedSourceError): 70 error = FileBasedSourceError(error).value 71 super().__init__( 72 f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}" 73 )
Common base class for all non-exit exceptions.
68 def __init__(self, error: Union[FileBasedSourceError, str], **kwargs): # type: ignore # noqa 69 if isinstance(error, FileBasedSourceError): 70 error = FileBasedSourceError(error).value 71 super().__init__( 72 f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}" 73 )
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
Common base class for all non-exit exceptions.
Inherited Members
116class DuplicatedFilesError(BaseFileBasedSourceError): 117 def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any): 118 self._duplicated_files_names = duplicated_files_names 119 self._stream_name: str = kwargs["stream"] 120 super().__init__(self._format_duplicate_files_error_message(), **kwargs) 121 122 def _format_duplicate_files_error_message(self) -> str: 123 duplicated_files_messages = [] 124 for duplicated_file in self._duplicated_files_names: 125 for duplicated_file_name, file_paths in duplicated_file.items(): 126 file_duplicated_message = ( 127 f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n" 128 + "".join(f"\n - {file_paths}") 129 ) 130 duplicated_files_messages.append(file_duplicated_message) 131 132 error_message = ( 133 f"ERROR: Duplicate filenames found for stream {self._stream_name}. " 134 "Duplicate file names are not allowed if the Preserve Sub-Directories in File Paths option is disabled. " 135 "Please remove or rename the duplicate files before attempting to re-run the sync.\n\n" 136 + "\n".join(duplicated_files_messages) 137 ) 138 139 return error_message 140 141 def __repr__(self) -> str: 142 """Return a string representation of the exception.""" 143 class_name = self.__class__.__name__ 144 properties_str = ", ".join( 145 f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_") 146 ) 147 return f"{class_name}({properties_str})"
Common base class for all non-exit exceptions.
150class CustomFileBasedException(AirbyteTracedException): 151 """ 152 A specialized exception for file-based connectors. 153 154 This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages. 155 """ 156 157 pass
A specialized exception for file-based connectors.
This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
A specialized exception for file-based connectors.
This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
An exception that should be emitted as an AirbyteTraceMessage