airbyte_cdk.sources.file_based.exceptions

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4
  5from enum import Enum
  6from typing import Any, List, Union
  7
  8from airbyte_cdk.models import AirbyteMessage, FailureType
  9from airbyte_cdk.utils import AirbyteTracedException
 10
 11
 12class FileBasedSourceError(Enum):
 13    EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict."
 14    GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split."
 15    ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding."
 16    ERROR_CASTING_VALUE = "Could not cast the value to the expected type."
 17    ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string."
 18    ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record."
 19    ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files."
 20    ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files."
 21    ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable."
 22    ERROR_PARSING_USER_PROVIDED_SCHEMA = (
 23        "The provided schema could not be transformed into valid JSON Schema."
 24    )
 25    ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
 26    ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
 27    ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
 28    STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."
 29    NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key."
 30    UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type."
 31    SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?"
 32    INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns."
 33    CONFIG_VALIDATION_ERROR = "Error creating stream config object."
 34    MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing."
 35    UNDEFINED_PARSER = "No parser is defined for this file type."
 36    UNDEFINED_VALIDATION_POLICY = (
 37        "The validation policy defined in the config does not exist for the source."
 38    )
 39
 40
 41class FileBasedErrorsCollector:
 42    """
 43    The placeholder for all errors collected.
 44    """
 45
 46    errors: List[AirbyteMessage] = []
 47
 48    def yield_and_raise_collected(self) -> Any:
 49        if self.errors:
 50            # emit collected logged messages
 51            yield from self.errors
 52            # clean the collector
 53            self.errors.clear()
 54            # raising the single exception
 55            raise AirbyteTracedException(
 56                internal_message="Please check the logged errors for more information.",
 57                message="Some errors occured while reading from the source.",
 58                failure_type=FailureType.config_error,
 59            )
 60
 61    def collect(self, logged_error: AirbyteMessage) -> None:
 62        self.errors.append(logged_error)
 63
 64
 65class BaseFileBasedSourceError(Exception):
 66    def __init__(self, error: Union[FileBasedSourceError, str], **kwargs):  # type: ignore # noqa
 67        if isinstance(error, FileBasedSourceError):
 68            error = FileBasedSourceError(error).value
 69        super().__init__(
 70            f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}"
 71        )
 72
 73
 74class ConfigValidationError(BaseFileBasedSourceError):
 75    pass
 76
 77
 78class InvalidSchemaError(BaseFileBasedSourceError):
 79    pass
 80
 81
 82class MissingSchemaError(BaseFileBasedSourceError):
 83    pass
 84
 85
 86class NoFilesMatchingError(BaseFileBasedSourceError):
 87    pass
 88
 89
 90class RecordParseError(BaseFileBasedSourceError):
 91    pass
 92
 93
 94class SchemaInferenceError(BaseFileBasedSourceError):
 95    pass
 96
 97
 98class CheckAvailabilityError(BaseFileBasedSourceError):
 99    pass
100
101
102class UndefinedParserError(BaseFileBasedSourceError):
103    pass
104
105
106class StopSyncPerValidationPolicy(BaseFileBasedSourceError):
107    pass
108
109
110class ErrorListingFiles(BaseFileBasedSourceError):
111    pass
112
113
114class DuplicatedFilesError(BaseFileBasedSourceError):
115    def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any):
116        self._duplicated_files_names = duplicated_files_names
117        self._stream_name: str = kwargs["stream"]
118        super().__init__(self._format_duplicate_files_error_message(), **kwargs)
119
120    def _format_duplicate_files_error_message(self) -> str:
121        duplicated_files_messages = []
122        for duplicated_file in self._duplicated_files_names:
123            for duplicated_file_name, file_paths in duplicated_file.items():
124                file_duplicated_message = (
125                    f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n"
126                    + "".join(f"\n - {file_paths}")
127                )
128                duplicated_files_messages.append(file_duplicated_message)
129
130        error_message = (
131            f"ERROR: Duplicate filenames found for stream {self._stream_name}. "
132            "Duplicate file names are not allowed if the Preserve Sub-Directories in File Paths option is disabled. "
133            "Please remove or rename the duplicate files before attempting to re-run the sync.\n\n"
134            + "\n".join(duplicated_files_messages)
135        )
136
137        return error_message
138
139    def __repr__(self) -> str:
140        """Return a string representation of the exception."""
141        class_name = self.__class__.__name__
142        properties_str = ", ".join(
143            f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_")
144        )
145        return f"{class_name}({properties_str})"
146
147
148class CustomFileBasedException(AirbyteTracedException):
149    """
150    A specialized exception for file-based connectors.
151
152    This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
153    """
154
155    pass
156
157
158class FileSizeLimitError(CustomFileBasedException):
159    pass
class FileBasedSourceError(enum.Enum):
13class FileBasedSourceError(Enum):
14    EMPTY_STREAM = "No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict."
15    GLOB_PARSE_ERROR = "Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split."
16    ENCODING_ERROR = "File encoding error. The configured encoding must match file encoding."
17    ERROR_CASTING_VALUE = "Could not cast the value to the expected type."
18    ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = "Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string."
19    ERROR_DECODING_VALUE = "Expected a JSON-decodeable value but could not decode record."
20    ERROR_LISTING_FILES = "Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files."
21    ERROR_READING_FILE = "Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files."
22    ERROR_PARSING_RECORD = "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable."
23    ERROR_PARSING_USER_PROVIDED_SCHEMA = (
24        "The provided schema could not be transformed into valid JSON Schema."
25    )
26    ERROR_VALIDATING_RECORD = "One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy."
27    ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
28    ERROR_PARSING_RECORD_MISMATCHED_ROWS = "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows."
29    STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = "Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema."
30    NULL_VALUE_IN_SCHEMA = "Error during schema inference: no type was detected for key."
31    UNRECOGNIZED_TYPE = "Error during schema inference: unrecognized type."
32    SCHEMA_INFERENCE_ERROR = "Error inferring schema from files. Are the files valid?"
33    INVALID_SCHEMA_ERROR = "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns."
34    CONFIG_VALIDATION_ERROR = "Error creating stream config object."
35    MISSING_SCHEMA = "Expected `json_schema` in the configured catalog but it is missing."
36    UNDEFINED_PARSER = "No parser is defined for this file type."
37    UNDEFINED_VALIDATION_POLICY = (
38        "The validation policy defined in the config does not exist for the source."
39    )

An enumeration.

EMPTY_STREAM = <FileBasedSourceError.EMPTY_STREAM: 'No files were identified in the stream. This may be because there are no files in the specified container, or because your glob patterns did not match any files. Please verify that your source contains files last modified after the start_date and that your glob patterns are not overly strict.'>
GLOB_PARSE_ERROR = <FileBasedSourceError.GLOB_PARSE_ERROR: 'Error parsing glob pattern. Please refer to the glob pattern rules at https://facelessuser.github.io/wcmatch/glob/#split.'>
ENCODING_ERROR = <FileBasedSourceError.ENCODING_ERROR: 'File encoding error. The configured encoding must match file encoding.'>
ERROR_CASTING_VALUE = <FileBasedSourceError.ERROR_CASTING_VALUE: 'Could not cast the value to the expected type.'>
ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE = <FileBasedSourceError.ERROR_CASTING_VALUE_UNRECOGNIZED_TYPE: 'Could not cast the value to the expected type because the type is not recognized. Valid types are null, array, boolean, integer, number, object, and string.'>
ERROR_DECODING_VALUE = <FileBasedSourceError.ERROR_DECODING_VALUE: 'Expected a JSON-decodeable value but could not decode record.'>
ERROR_LISTING_FILES = <FileBasedSourceError.ERROR_LISTING_FILES: 'Error listing files. Please check the credentials provided in the config and verify that they provide permission to list files.'>
ERROR_READING_FILE = <FileBasedSourceError.ERROR_READING_FILE: 'Error opening file. Please check the credentials provided in the config and verify that they provide permission to read files.'>
ERROR_PARSING_RECORD = <FileBasedSourceError.ERROR_PARSING_RECORD: "Error parsing record. This could be due to a mismatch between the config's file type and the actual file type, or because the file or record is not parseable.">
ERROR_PARSING_USER_PROVIDED_SCHEMA = <FileBasedSourceError.ERROR_PARSING_USER_PROVIDED_SCHEMA: 'The provided schema could not be transformed into valid JSON Schema.'>
ERROR_VALIDATING_RECORD = <FileBasedSourceError.ERROR_VALIDATING_RECORD: 'One or more records do not pass the schema validation policy. Please modify your input schema, or select a more lenient validation policy.'>
ERROR_PARSING_RECORD_MISMATCHED_COLUMNS = <FileBasedSourceError.ERROR_PARSING_RECORD_MISMATCHED_COLUMNS: "A header field has resolved to `None`. This indicates that the CSV has more rows than the number of header fields. If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows.">
ERROR_PARSING_RECORD_MISMATCHED_ROWS = <FileBasedSourceError.ERROR_PARSING_RECORD_MISMATCHED_ROWS: "A row's value has resolved to `None`. This indicates that the CSV has more columns in the header field than the number of columns in the row(s). If you input your schema or headers, please verify that the number of columns corresponds to the number of columns in your CSV's rows.">
STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY = <FileBasedSourceError.STOP_SYNC_PER_SCHEMA_VALIDATION_POLICY: 'Stopping sync in accordance with the configured validation policy. Records in file did not conform to the schema.'>
NULL_VALUE_IN_SCHEMA = <FileBasedSourceError.NULL_VALUE_IN_SCHEMA: 'Error during schema inference: no type was detected for key.'>
UNRECOGNIZED_TYPE = <FileBasedSourceError.UNRECOGNIZED_TYPE: 'Error during schema inference: unrecognized type.'>
SCHEMA_INFERENCE_ERROR = <FileBasedSourceError.SCHEMA_INFERENCE_ERROR: 'Error inferring schema from files. Are the files valid?'>
INVALID_SCHEMA_ERROR = <FileBasedSourceError.INVALID_SCHEMA_ERROR: "No fields were identified for this schema. This may happen if the stream is empty. Please check your configuration to verify that there are files that match the stream's glob patterns.">
CONFIG_VALIDATION_ERROR = <FileBasedSourceError.CONFIG_VALIDATION_ERROR: 'Error creating stream config object.'>
MISSING_SCHEMA = <FileBasedSourceError.MISSING_SCHEMA: 'Expected `json_schema` in the configured catalog but it is missing.'>
UNDEFINED_PARSER = <FileBasedSourceError.UNDEFINED_PARSER: 'No parser is defined for this file type.'>
UNDEFINED_VALIDATION_POLICY = <FileBasedSourceError.UNDEFINED_VALIDATION_POLICY: 'The validation policy defined in the config does not exist for the source.'>
class FileBasedErrorsCollector:
42class FileBasedErrorsCollector:
43    """
44    The placeholder for all errors collected.
45    """
46
47    errors: List[AirbyteMessage] = []
48
49    def yield_and_raise_collected(self) -> Any:
50        if self.errors:
51            # emit collected logged messages
52            yield from self.errors
53            # clean the collector
54            self.errors.clear()
55            # raising the single exception
56            raise AirbyteTracedException(
57                internal_message="Please check the logged errors for more information.",
58                message="Some errors occured while reading from the source.",
59                failure_type=FailureType.config_error,
60            )
61
62    def collect(self, logged_error: AirbyteMessage) -> None:
63        self.errors.append(logged_error)

The placeholder for all errors collected.

errors: List[airbyte_cdk.AirbyteMessage] = []
def yield_and_raise_collected(self) -> Any:
49    def yield_and_raise_collected(self) -> Any:
50        if self.errors:
51            # emit collected logged messages
52            yield from self.errors
53            # clean the collector
54            self.errors.clear()
55            # raising the single exception
56            raise AirbyteTracedException(
57                internal_message="Please check the logged errors for more information.",
58                message="Some errors occured while reading from the source.",
59                failure_type=FailureType.config_error,
60            )
def collect( self, logged_error: airbyte_cdk.AirbyteMessage) -> None:
62    def collect(self, logged_error: AirbyteMessage) -> None:
63        self.errors.append(logged_error)
class BaseFileBasedSourceError(builtins.Exception):
66class BaseFileBasedSourceError(Exception):
67    def __init__(self, error: Union[FileBasedSourceError, str], **kwargs):  # type: ignore # noqa
68        if isinstance(error, FileBasedSourceError):
69            error = FileBasedSourceError(error).value
70        super().__init__(
71            f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}"
72        )

Common base class for all non-exit exceptions.

BaseFileBasedSourceError( error: Union[FileBasedSourceError, str], **kwargs)
67    def __init__(self, error: Union[FileBasedSourceError, str], **kwargs):  # type: ignore # noqa
68        if isinstance(error, FileBasedSourceError):
69            error = FileBasedSourceError(error).value
70        super().__init__(
71            f"{error} Contact Support if you need assistance.\n{' '.join([f'{k}={v}' for k, v in kwargs.items()])}"
72        )
class ConfigValidationError(BaseFileBasedSourceError):
75class ConfigValidationError(BaseFileBasedSourceError):
76    pass

Common base class for all non-exit exceptions.

class InvalidSchemaError(BaseFileBasedSourceError):
79class InvalidSchemaError(BaseFileBasedSourceError):
80    pass

Common base class for all non-exit exceptions.

class MissingSchemaError(BaseFileBasedSourceError):
83class MissingSchemaError(BaseFileBasedSourceError):
84    pass

Common base class for all non-exit exceptions.

class NoFilesMatchingError(BaseFileBasedSourceError):
87class NoFilesMatchingError(BaseFileBasedSourceError):
88    pass

Common base class for all non-exit exceptions.

class RecordParseError(BaseFileBasedSourceError):
91class RecordParseError(BaseFileBasedSourceError):
92    pass

Common base class for all non-exit exceptions.

class SchemaInferenceError(BaseFileBasedSourceError):
95class SchemaInferenceError(BaseFileBasedSourceError):
96    pass

Common base class for all non-exit exceptions.

class CheckAvailabilityError(BaseFileBasedSourceError):
 99class CheckAvailabilityError(BaseFileBasedSourceError):
100    pass

Common base class for all non-exit exceptions.

class UndefinedParserError(BaseFileBasedSourceError):
103class UndefinedParserError(BaseFileBasedSourceError):
104    pass

Common base class for all non-exit exceptions.

class StopSyncPerValidationPolicy(BaseFileBasedSourceError):
107class StopSyncPerValidationPolicy(BaseFileBasedSourceError):
108    pass

Common base class for all non-exit exceptions.

class ErrorListingFiles(BaseFileBasedSourceError):
111class ErrorListingFiles(BaseFileBasedSourceError):
112    pass

Common base class for all non-exit exceptions.

class DuplicatedFilesError(BaseFileBasedSourceError):
115class DuplicatedFilesError(BaseFileBasedSourceError):
116    def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any):
117        self._duplicated_files_names = duplicated_files_names
118        self._stream_name: str = kwargs["stream"]
119        super().__init__(self._format_duplicate_files_error_message(), **kwargs)
120
121    def _format_duplicate_files_error_message(self) -> str:
122        duplicated_files_messages = []
123        for duplicated_file in self._duplicated_files_names:
124            for duplicated_file_name, file_paths in duplicated_file.items():
125                file_duplicated_message = (
126                    f"{len(file_paths)} duplicates found for file name {duplicated_file_name}:\n\n"
127                    + "".join(f"\n - {file_paths}")
128                )
129                duplicated_files_messages.append(file_duplicated_message)
130
131        error_message = (
132            f"ERROR: Duplicate filenames found for stream {self._stream_name}. "
133            "Duplicate file names are not allowed if the Preserve Sub-Directories in File Paths option is disabled. "
134            "Please remove or rename the duplicate files before attempting to re-run the sync.\n\n"
135            + "\n".join(duplicated_files_messages)
136        )
137
138        return error_message
139
140    def __repr__(self) -> str:
141        """Return a string representation of the exception."""
142        class_name = self.__class__.__name__
143        properties_str = ", ".join(
144            f"{k}={v!r}" for k, v in self.__dict__.items() if not k.startswith("_")
145        )
146        return f"{class_name}({properties_str})"

Common base class for all non-exit exceptions.

DuplicatedFilesError(duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any)
116    def __init__(self, duplicated_files_names: List[dict[str, List[str]]], **kwargs: Any):
117        self._duplicated_files_names = duplicated_files_names
118        self._stream_name: str = kwargs["stream"]
119        super().__init__(self._format_duplicate_files_error_message(), **kwargs)
class CustomFileBasedException(airbyte_cdk.utils.traced_exception.AirbyteTracedException):
149class CustomFileBasedException(AirbyteTracedException):
150    """
151    A specialized exception for file-based connectors.
152
153    This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.
154    """
155
156    pass

A specialized exception for file-based connectors.

This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.

class FileSizeLimitError(CustomFileBasedException):
159class FileSizeLimitError(CustomFileBasedException):
160    pass

A specialized exception for file-based connectors.

This exception is designed to bypass the default error handling in the file-based CDK, allowing the use of custom error messages.