airbyte_cdk.sources.file_based.availability_strategy

1from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy
2from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
3
4__all__ = [
5    "AbstractFileBasedAvailabilityStrategy",
6    "DefaultFileBasedAvailabilityStrategy",
7]
class AbstractFileBasedAvailabilityStrategy(airbyte_cdk.sources.streams.availability_strategy.AvailabilityStrategy):
20class AbstractFileBasedAvailabilityStrategy(AvailabilityStrategy):
21    @abstractmethod
22    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
23        self,
24        stream: Stream,
25        logger: logging.Logger,
26        source: Optional[Source] = None,
27    ) -> Tuple[bool, Optional[str]]:
28        """
29        Perform a connection check for the stream.
30
31        Returns (True, None) if successful, otherwise (False, <error message>).
32        """
33        ...
34
35    @abstractmethod
36    def check_availability_and_parsability(
37        self,
38        stream: AbstractFileBasedStream,
39        logger: logging.Logger,
40        _: Optional[Source],
41    ) -> Tuple[bool, Optional[str]]:
42        """
43        Performs a connection check for the stream, as well as additional checks that
44        verify that the connection is working as expected.
45
46        Returns (True, None) if successful, otherwise (False, <error message>).
47        """
48        ...

Abstract base class for checking stream availability.

@abstractmethod
def check_availability( self, stream: airbyte_cdk.Stream, logger: logging.Logger, source: Optional[airbyte_cdk.Source] = None) -> Tuple[bool, Optional[str]]:
21    @abstractmethod
22    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
23        self,
24        stream: Stream,
25        logger: logging.Logger,
26        source: Optional[Source] = None,
27    ) -> Tuple[bool, Optional[str]]:
28        """
29        Perform a connection check for the stream.
30
31        Returns (True, None) if successful, otherwise (False, <error message>).
32        """
33        ...

Perform a connection check for the stream.

Returns (True, None) if successful, otherwise (False, ).

@abstractmethod
def check_availability_and_parsability( self, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, logger: logging.Logger, _: Optional[airbyte_cdk.Source]) -> Tuple[bool, Optional[str]]:
35    @abstractmethod
36    def check_availability_and_parsability(
37        self,
38        stream: AbstractFileBasedStream,
39        logger: logging.Logger,
40        _: Optional[Source],
41    ) -> Tuple[bool, Optional[str]]:
42        """
43        Performs a connection check for the stream, as well as additional checks that
44        verify that the connection is working as expected.
45
46        Returns (True, None) if successful, otherwise (False, <error message>).
47        """
48        ...

Performs a connection check for the stream, as well as additional checks that verify that the connection is working as expected.

Returns (True, None) if successful, otherwise (False, ).

 29class DefaultFileBasedAvailabilityStrategy(AbstractFileBasedAvailabilityStrategy):
 30    def __init__(self, stream_reader: AbstractFileBasedStreamReader) -> None:
 31        self.stream_reader = stream_reader
 32
 33    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
 34        self,
 35        stream: AbstractFileBasedStream,
 36        logger: logging.Logger,
 37        _: Optional[Source],
 38    ) -> Tuple[bool, Optional[str]]:
 39        """
 40        Perform a connection check for the stream (verify that we can list files from the stream).
 41
 42        Returns `(True, None)` if successful, otherwise `(False, <error message>)`.
 43
 44        The returned reason is the actionable error string from the underlying
 45        `CheckAvailabilityError`. Connector-raised `AirbyteTracedException`s are
 46        re-raised so that their actionable `message` propagates to the platform
 47        unchanged. The full traceback is still logged for debugging.
 48        """
 49        try:
 50            self._check_list_files(stream)
 51        except CheckAvailabilityError as exc:
 52            logger.exception("Stream availability check failed")
 53            return False, str(exc)
 54
 55        return True, None
 56
 57    def check_availability_and_parsability(
 58        self,
 59        stream: AbstractFileBasedStream,
 60        logger: logging.Logger,
 61        _: Optional[Source],
 62    ) -> Tuple[bool, Optional[str]]:
 63        """
 64        Perform a connection check for the stream.
 65
 66        Returns (True, None) if successful, otherwise (False, <error message>).
 67
 68        For the stream:
 69        - Verify the parser config is valid per check_config method of the parser.
 70        - Verify that we can list files from the stream using the configured globs.
 71        - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.
 72
 73        This method will also check that the files and their contents are consistent
 74        with the configured options, as follows:
 75        - If the files have extensions, verify that they don't disagree with the
 76          configured file type.
 77        - If the user provided a schema in the config, check that a subset of records in
 78          one file conform to the schema via a call to stream.conforms_to_schema(schema).
 79        """
 80        parser = stream.get_parser()
 81        config_check_result, config_check_error_message = parser.check_config(stream.config)
 82        if config_check_result is False:
 83            return False, config_check_error_message
 84        try:
 85            file = self._check_list_files(stream)
 86            if not parser.parser_max_n_files_for_parsability == 0:
 87                self._check_parse_record(stream, file, logger)
 88            else:
 89                # If the parser is set to not check parsability, we still want to check that we can open the file.
 90                handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger)
 91                handle.close()
 92        except AirbyteTracedException:
 93            raise
 94        except CheckAvailabilityError as exc:
 95            logger.exception("Stream availability check failed")
 96            return False, str(exc)
 97
 98        return True, None
 99
100    def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile:
101        """
102        Check that we can list files from the stream.
103
104        Returns the first file if successful, otherwise raises a `CheckAvailabilityError`.
105
106        `AirbyteTracedException`s raised by the underlying stream reader are
107        re-raised unchanged so that connector-specific actionable messages
108        (e.g. invalid credentials, missing permissions) reach the platform
109        without being wrapped in a generic `ERROR_LISTING_FILES` reason.
110        """
111        try:
112            file = next(iter(stream.get_files()))
113        except StopIteration:
114            raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name)
115        except CustomFileBasedException as exc:
116            raise CheckAvailabilityError(str(exc), stream=stream.name) from exc
117        except AirbyteTracedException:
118            raise
119        except Exception as exc:
120            raise CheckAvailabilityError(
121                FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name
122            ) from exc
123
124        return file
125
126    def _check_parse_record(
127        self,
128        stream: AbstractFileBasedStream,
129        file: RemoteFile,
130        logger: logging.Logger,
131    ) -> None:
132        parser = stream.get_parser()
133
134        try:
135            record = next(
136                iter(
137                    parser.parse_records(
138                        stream.config, file, self.stream_reader, logger, discovered_schema=None
139                    )
140                )
141            )
142        except StopIteration:
143            # The file is empty. We've verified that we can open it, so will
144            # consider the connection check successful even though it means
145            # we skip the schema validation check.
146            return
147        except AirbyteTracedException as ate:
148            raise ate
149        except Exception as exc:
150            raise CheckAvailabilityError(
151                FileBasedSourceError.ERROR_READING_FILE, stream=stream.name, file=file.uri
152            ) from exc
153
154        schema = stream.catalog_schema or stream.config.input_schema
155        if schema and stream.validation_policy.validate_schema_before_sync:
156            if not conforms_to_schema(record, schema):  # type: ignore
157                raise CheckAvailabilityError(
158                    FileBasedSourceError.ERROR_VALIDATING_RECORD,
159                    stream=stream.name,
160                    file=file.uri,
161                )
162
163        return None

Abstract base class for checking stream availability.

DefaultFileBasedAvailabilityStrategy( stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader)
30    def __init__(self, stream_reader: AbstractFileBasedStreamReader) -> None:
31        self.stream_reader = stream_reader
stream_reader
def check_availability( self, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, logger: logging.Logger, _: Optional[airbyte_cdk.Source]) -> Tuple[bool, Optional[str]]:
33    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
34        self,
35        stream: AbstractFileBasedStream,
36        logger: logging.Logger,
37        _: Optional[Source],
38    ) -> Tuple[bool, Optional[str]]:
39        """
40        Perform a connection check for the stream (verify that we can list files from the stream).
41
42        Returns `(True, None)` if successful, otherwise `(False, <error message>)`.
43
44        The returned reason is the actionable error string from the underlying
45        `CheckAvailabilityError`. Connector-raised `AirbyteTracedException`s are
46        re-raised so that their actionable `message` propagates to the platform
47        unchanged. The full traceback is still logged for debugging.
48        """
49        try:
50            self._check_list_files(stream)
51        except CheckAvailabilityError as exc:
52            logger.exception("Stream availability check failed")
53            return False, str(exc)
54
55        return True, None

Perform a connection check for the stream (verify that we can list files from the stream).

Returns (True, None) if successful, otherwise (False, <error message>).

The returned reason is the actionable error string from the underlying CheckAvailabilityError. Connector-raised AirbyteTracedExceptions are re-raised so that their actionable message propagates to the platform unchanged. The full traceback is still logged for debugging.

def check_availability_and_parsability( self, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, logger: logging.Logger, _: Optional[airbyte_cdk.Source]) -> Tuple[bool, Optional[str]]:
57    def check_availability_and_parsability(
58        self,
59        stream: AbstractFileBasedStream,
60        logger: logging.Logger,
61        _: Optional[Source],
62    ) -> Tuple[bool, Optional[str]]:
63        """
64        Perform a connection check for the stream.
65
66        Returns (True, None) if successful, otherwise (False, <error message>).
67
68        For the stream:
69        - Verify the parser config is valid per check_config method of the parser.
70        - Verify that we can list files from the stream using the configured globs.
71        - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.
72
73        This method will also check that the files and their contents are consistent
74        with the configured options, as follows:
75        - If the files have extensions, verify that they don't disagree with the
76          configured file type.
77        - If the user provided a schema in the config, check that a subset of records in
78          one file conform to the schema via a call to stream.conforms_to_schema(schema).
79        """
80        parser = stream.get_parser()
81        config_check_result, config_check_error_message = parser.check_config(stream.config)
82        if config_check_result is False:
83            return False, config_check_error_message
84        try:
85            file = self._check_list_files(stream)
86            if not parser.parser_max_n_files_for_parsability == 0:
87                self._check_parse_record(stream, file, logger)
88            else:
89                # If the parser is set to not check parsability, we still want to check that we can open the file.
90                handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger)
91                handle.close()
92        except AirbyteTracedException:
93            raise
94        except CheckAvailabilityError as exc:
95            logger.exception("Stream availability check failed")
96            return False, str(exc)
97
98        return True, None

Perform a connection check for the stream.

Returns (True, None) if successful, otherwise (False, ).

For the stream:

  • Verify the parser config is valid per check_config method of the parser.
  • Verify that we can list files from the stream using the configured globs.
  • Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.

This method will also check that the files and their contents are consistent with the configured options, as follows:

  • If the files have extensions, verify that they don't disagree with the configured file type.
  • If the user provided a schema in the config, check that a subset of records in one file conform to the schema via a call to stream.conforms_to_schema(schema).