airbyte_cdk.sources.file_based.availability_strategy

1from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy
2from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
3
4__all__ = [
5    "AbstractFileBasedAvailabilityStrategy",
6    "DefaultFileBasedAvailabilityStrategy",
7]
class AbstractFileBasedAvailabilityStrategy(airbyte_cdk.sources.streams.availability_strategy.AvailabilityStrategy):
20class AbstractFileBasedAvailabilityStrategy(AvailabilityStrategy):
21    @abstractmethod
22    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
23        self,
24        stream: Stream,
25        logger: logging.Logger,
26        source: Optional[Source] = None,
27    ) -> Tuple[bool, Optional[str]]:
28        """
29        Perform a connection check for the stream.
30
31        Returns (True, None) if successful, otherwise (False, <error message>).
32        """
33        ...
34
35    @abstractmethod
36    def check_availability_and_parsability(
37        self,
38        stream: AbstractFileBasedStream,
39        logger: logging.Logger,
40        _: Optional[Source],
41    ) -> Tuple[bool, Optional[str]]:
42        """
43        Performs a connection check for the stream, as well as additional checks that
44        verify that the connection is working as expected.
45
46        Returns (True, None) if successful, otherwise (False, <error message>).
47        """
48        ...

Abstract base class for checking stream availability.

@abstractmethod
def check_availability( self, stream: airbyte_cdk.Stream, logger: logging.Logger, source: Optional[airbyte_cdk.Source] = None) -> Tuple[bool, Optional[str]]:
21    @abstractmethod
22    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
23        self,
24        stream: Stream,
25        logger: logging.Logger,
26        source: Optional[Source] = None,
27    ) -> Tuple[bool, Optional[str]]:
28        """
29        Perform a connection check for the stream.
30
31        Returns (True, None) if successful, otherwise (False, <error message>).
32        """
33        ...

Perform a connection check for the stream.

Returns (True, None) if successful, otherwise (False, ).

@abstractmethod
def check_availability_and_parsability( self, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, logger: logging.Logger, _: Optional[airbyte_cdk.Source]) -> Tuple[bool, Optional[str]]:
35    @abstractmethod
36    def check_availability_and_parsability(
37        self,
38        stream: AbstractFileBasedStream,
39        logger: logging.Logger,
40        _: Optional[Source],
41    ) -> Tuple[bool, Optional[str]]:
42        """
43        Performs a connection check for the stream, as well as additional checks that
44        verify that the connection is working as expected.
45
46        Returns (True, None) if successful, otherwise (False, <error message>).
47        """
48        ...

Performs a connection check for the stream, as well as additional checks that verify that the connection is working as expected.

Returns (True, None) if successful, otherwise (False, ).

 30class DefaultFileBasedAvailabilityStrategy(AbstractFileBasedAvailabilityStrategy):
 31    def __init__(self, stream_reader: AbstractFileBasedStreamReader) -> None:
 32        self.stream_reader = stream_reader
 33
 34    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
 35        self,
 36        stream: AbstractFileBasedStream,
 37        logger: logging.Logger,
 38        _: Optional[Source],
 39    ) -> Tuple[bool, Optional[str]]:
 40        """
 41        Perform a connection check for the stream (verify that we can list files from the stream).
 42
 43        Returns (True, None) if successful, otherwise (False, <error message>).
 44        """
 45        try:
 46            self._check_list_files(stream)
 47        except CheckAvailabilityError:
 48            return False, "".join(traceback.format_exc())
 49
 50        return True, None
 51
 52    def check_availability_and_parsability(
 53        self,
 54        stream: AbstractFileBasedStream,
 55        logger: logging.Logger,
 56        _: Optional[Source],
 57    ) -> Tuple[bool, Optional[str]]:
 58        """
 59        Perform a connection check for the stream.
 60
 61        Returns (True, None) if successful, otherwise (False, <error message>).
 62
 63        For the stream:
 64        - Verify the parser config is valid per check_config method of the parser.
 65        - Verify that we can list files from the stream using the configured globs.
 66        - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.
 67
 68        This method will also check that the files and their contents are consistent
 69        with the configured options, as follows:
 70        - If the files have extensions, verify that they don't disagree with the
 71          configured file type.
 72        - If the user provided a schema in the config, check that a subset of records in
 73          one file conform to the schema via a call to stream.conforms_to_schema(schema).
 74        """
 75        parser = stream.get_parser()
 76        config_check_result, config_check_error_message = parser.check_config(stream.config)
 77        if config_check_result is False:
 78            return False, config_check_error_message
 79        try:
 80            file = self._check_list_files(stream)
 81            if not parser.parser_max_n_files_for_parsability == 0:
 82                self._check_parse_record(stream, file, logger)
 83            else:
 84                # If the parser is set to not check parsability, we still want to check that we can open the file.
 85                handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger)
 86                handle.close()
 87        except AirbyteTracedException as ate:
 88            raise ate
 89        except CheckAvailabilityError:
 90            return False, "".join(traceback.format_exc())
 91
 92        return True, None
 93
 94    def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile:
 95        """
 96        Check that we can list files from the stream.
 97
 98        Returns the first file if successful, otherwise raises a CheckAvailabilityError.
 99        """
100        try:
101            file = next(iter(stream.get_files()))
102        except StopIteration:
103            raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name)
104        except CustomFileBasedException as exc:
105            raise CheckAvailabilityError(str(exc), stream=stream.name) from exc
106        except Exception as exc:
107            raise CheckAvailabilityError(
108                FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name
109            ) from exc
110
111        return file
112
113    def _check_parse_record(
114        self,
115        stream: AbstractFileBasedStream,
116        file: RemoteFile,
117        logger: logging.Logger,
118    ) -> None:
119        parser = stream.get_parser()
120
121        try:
122            record = next(
123                iter(
124                    parser.parse_records(
125                        stream.config, file, self.stream_reader, logger, discovered_schema=None
126                    )
127                )
128            )
129        except StopIteration:
130            # The file is empty. We've verified that we can open it, so will
131            # consider the connection check successful even though it means
132            # we skip the schema validation check.
133            return
134        except AirbyteTracedException as ate:
135            raise ate
136        except Exception as exc:
137            raise CheckAvailabilityError(
138                FileBasedSourceError.ERROR_READING_FILE, stream=stream.name, file=file.uri
139            ) from exc
140
141        schema = stream.catalog_schema or stream.config.input_schema
142        if schema and stream.validation_policy.validate_schema_before_sync:
143            if not conforms_to_schema(record, schema):  # type: ignore
144                raise CheckAvailabilityError(
145                    FileBasedSourceError.ERROR_VALIDATING_RECORD,
146                    stream=stream.name,
147                    file=file.uri,
148                )
149
150        return None

Abstract base class for checking stream availability.

DefaultFileBasedAvailabilityStrategy( stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader)
31    def __init__(self, stream_reader: AbstractFileBasedStreamReader) -> None:
32        self.stream_reader = stream_reader
stream_reader
def check_availability( self, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, logger: logging.Logger, _: Optional[airbyte_cdk.Source]) -> Tuple[bool, Optional[str]]:
34    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
35        self,
36        stream: AbstractFileBasedStream,
37        logger: logging.Logger,
38        _: Optional[Source],
39    ) -> Tuple[bool, Optional[str]]:
40        """
41        Perform a connection check for the stream (verify that we can list files from the stream).
42
43        Returns (True, None) if successful, otherwise (False, <error message>).
44        """
45        try:
46            self._check_list_files(stream)
47        except CheckAvailabilityError:
48            return False, "".join(traceback.format_exc())
49
50        return True, None

Perform a connection check for the stream (verify that we can list files from the stream).

Returns (True, None) if successful, otherwise (False, ).

def check_availability_and_parsability( self, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, logger: logging.Logger, _: Optional[airbyte_cdk.Source]) -> Tuple[bool, Optional[str]]:
52    def check_availability_and_parsability(
53        self,
54        stream: AbstractFileBasedStream,
55        logger: logging.Logger,
56        _: Optional[Source],
57    ) -> Tuple[bool, Optional[str]]:
58        """
59        Perform a connection check for the stream.
60
61        Returns (True, None) if successful, otherwise (False, <error message>).
62
63        For the stream:
64        - Verify the parser config is valid per check_config method of the parser.
65        - Verify that we can list files from the stream using the configured globs.
66        - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.
67
68        This method will also check that the files and their contents are consistent
69        with the configured options, as follows:
70        - If the files have extensions, verify that they don't disagree with the
71          configured file type.
72        - If the user provided a schema in the config, check that a subset of records in
73          one file conform to the schema via a call to stream.conforms_to_schema(schema).
74        """
75        parser = stream.get_parser()
76        config_check_result, config_check_error_message = parser.check_config(stream.config)
77        if config_check_result is False:
78            return False, config_check_error_message
79        try:
80            file = self._check_list_files(stream)
81            if not parser.parser_max_n_files_for_parsability == 0:
82                self._check_parse_record(stream, file, logger)
83            else:
84                # If the parser is set to not check parsability, we still want to check that we can open the file.
85                handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger)
86                handle.close()
87        except AirbyteTracedException as ate:
88            raise ate
89        except CheckAvailabilityError:
90            return False, "".join(traceback.format_exc())
91
92        return True, None

Perform a connection check for the stream.

Returns (True, None) if successful, otherwise (False, ).

For the stream:

  • Verify the parser config is valid per check_config method of the parser.
  • Verify that we can list files from the stream using the configured globs.
  • Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.

This method will also check that the files and their contents are consistent with the configured options, as follows:

  • If the files have extensions, verify that they don't disagree with the configured file type.
  • If the user provided a schema in the config, check that a subset of records in one file conform to the schema via a call to stream.conforms_to_schema(schema).