airbyte_cdk.sources.file_based.availability_strategy

 1from .abstract_file_based_availability_strategy import (
 2    AbstractFileBasedAvailabilityStrategy,
 3    AbstractFileBasedAvailabilityStrategyWrapper,
 4)
 5from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy
 6
 7__all__ = [
 8    "AbstractFileBasedAvailabilityStrategy",
 9    "AbstractFileBasedAvailabilityStrategyWrapper",
10    "DefaultFileBasedAvailabilityStrategy",
11]
class AbstractFileBasedAvailabilityStrategy(airbyte_cdk.sources.streams.availability_strategy.AvailabilityStrategy):
26class AbstractFileBasedAvailabilityStrategy(AvailabilityStrategy):
27    @abstractmethod
28    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
29        self,
30        stream: Stream,
31        logger: logging.Logger,
32        _: Optional[Source],
33    ) -> Tuple[bool, Optional[str]]:
34        """
35        Perform a connection check for the stream.
36
37        Returns (True, None) if successful, otherwise (False, <error message>).
38        """
39        ...
40
41    @abstractmethod
42    def check_availability_and_parsability(
43        self,
44        stream: AbstractFileBasedStream,
45        logger: logging.Logger,
46        _: Optional[Source],
47    ) -> Tuple[bool, Optional[str]]:
48        """
49        Performs a connection check for the stream, as well as additional checks that
50        verify that the connection is working as expected.
51
52        Returns (True, None) if successful, otherwise (False, <error message>).
53        """
54        ...

Abstract base class for checking stream availability.

@abstractmethod
def check_availability( self, stream: airbyte_cdk.Stream, logger: logging.Logger, _: Optional[airbyte_cdk.Source]) -> Tuple[bool, Optional[str]]:
27    @abstractmethod
28    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
29        self,
30        stream: Stream,
31        logger: logging.Logger,
32        _: Optional[Source],
33    ) -> Tuple[bool, Optional[str]]:
34        """
35        Perform a connection check for the stream.
36
37        Returns (True, None) if successful, otherwise (False, <error message>).
38        """
39        ...

Perform a connection check for the stream.

Returns (True, None) if successful, otherwise (False, ).

@abstractmethod
def check_availability_and_parsability( self, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, logger: logging.Logger, _: Optional[airbyte_cdk.Source]) -> Tuple[bool, Optional[str]]:
41    @abstractmethod
42    def check_availability_and_parsability(
43        self,
44        stream: AbstractFileBasedStream,
45        logger: logging.Logger,
46        _: Optional[Source],
47    ) -> Tuple[bool, Optional[str]]:
48        """
49        Performs a connection check for the stream, as well as additional checks that
50        verify that the connection is working as expected.
51
52        Returns (True, None) if successful, otherwise (False, <error message>).
53        """
54        ...

Performs a connection check for the stream, as well as additional checks that verify that the connection is working as expected.

Returns (True, None) if successful, otherwise (False, ).

class AbstractFileBasedAvailabilityStrategyWrapper(airbyte_cdk.sources.streams.concurrent.availability_strategy.AbstractAvailabilityStrategy):
57class AbstractFileBasedAvailabilityStrategyWrapper(AbstractAvailabilityStrategy):
58    def __init__(self, stream: AbstractFileBasedStream) -> None:
59        self.stream = stream
60
61    def check_availability(self, logger: logging.Logger) -> StreamAvailability:
62        is_available, reason = self.stream.availability_strategy.check_availability(
63            self.stream, logger, None
64        )
65        if is_available:
66            return StreamAvailable()
67        return StreamUnavailable(reason or "")
68
69    def check_availability_and_parsability(
70        self, logger: logging.Logger
71    ) -> Tuple[bool, Optional[str]]:
72        return self.stream.availability_strategy.check_availability_and_parsability(
73            self.stream, logger, None
74        )

AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.

Why create a new interface instead of using the existing AvailabilityStrategy? The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream.

AbstractFileBasedAvailabilityStrategyWrapper( stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream)
58    def __init__(self, stream: AbstractFileBasedStream) -> None:
59        self.stream = stream
stream
def check_availability( self, logger: logging.Logger) -> airbyte_cdk.sources.streams.concurrent.availability_strategy.StreamAvailability:
61    def check_availability(self, logger: logging.Logger) -> StreamAvailability:
62        is_available, reason = self.stream.availability_strategy.check_availability(
63            self.stream, logger, None
64        )
65        if is_available:
66            return StreamAvailable()
67        return StreamUnavailable(reason or "")

Checks stream availability.

Parameters
  • logger: logger object to use
Returns

A StreamAvailability object describing the stream's availability

def check_availability_and_parsability(self, logger: logging.Logger) -> Tuple[bool, Optional[str]]:
69    def check_availability_and_parsability(
70        self, logger: logging.Logger
71    ) -> Tuple[bool, Optional[str]]:
72        return self.stream.availability_strategy.check_availability_and_parsability(
73            self.stream, logger, None
74        )
 30class DefaultFileBasedAvailabilityStrategy(AbstractFileBasedAvailabilityStrategy):
 31    def __init__(self, stream_reader: AbstractFileBasedStreamReader) -> None:
 32        self.stream_reader = stream_reader
 33
 34    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
 35        self,
 36        stream: AbstractFileBasedStream,
 37        logger: logging.Logger,
 38        _: Optional[Source],
 39    ) -> Tuple[bool, Optional[str]]:
 40        """
 41        Perform a connection check for the stream (verify that we can list files from the stream).
 42
 43        Returns (True, None) if successful, otherwise (False, <error message>).
 44        """
 45        try:
 46            self._check_list_files(stream)
 47        except CheckAvailabilityError:
 48            return False, "".join(traceback.format_exc())
 49
 50        return True, None
 51
 52    def check_availability_and_parsability(
 53        self,
 54        stream: AbstractFileBasedStream,
 55        logger: logging.Logger,
 56        _: Optional[Source],
 57    ) -> Tuple[bool, Optional[str]]:
 58        """
 59        Perform a connection check for the stream.
 60
 61        Returns (True, None) if successful, otherwise (False, <error message>).
 62
 63        For the stream:
 64        - Verify the parser config is valid per check_config method of the parser.
 65        - Verify that we can list files from the stream using the configured globs.
 66        - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.
 67
 68        This method will also check that the files and their contents are consistent
 69        with the configured options, as follows:
 70        - If the files have extensions, verify that they don't disagree with the
 71          configured file type.
 72        - If the user provided a schema in the config, check that a subset of records in
 73          one file conform to the schema via a call to stream.conforms_to_schema(schema).
 74        """
 75        parser = stream.get_parser()
 76        config_check_result, config_check_error_message = parser.check_config(stream.config)
 77        if config_check_result is False:
 78            return False, config_check_error_message
 79        try:
 80            file = self._check_list_files(stream)
 81            if not parser.parser_max_n_files_for_parsability == 0:
 82                self._check_parse_record(stream, file, logger)
 83            else:
 84                # If the parser is set to not check parsability, we still want to check that we can open the file.
 85                handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger)
 86                handle.close()
 87        except AirbyteTracedException as ate:
 88            raise ate
 89        except CheckAvailabilityError:
 90            return False, "".join(traceback.format_exc())
 91
 92        return True, None
 93
 94    def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile:
 95        """
 96        Check that we can list files from the stream.
 97
 98        Returns the first file if successful, otherwise raises a CheckAvailabilityError.
 99        """
100        try:
101            file = next(iter(stream.get_files()))
102        except StopIteration:
103            raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name)
104        except CustomFileBasedException as exc:
105            raise CheckAvailabilityError(str(exc), stream=stream.name) from exc
106        except Exception as exc:
107            raise CheckAvailabilityError(
108                FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name
109            ) from exc
110
111        return file
112
113    def _check_parse_record(
114        self,
115        stream: AbstractFileBasedStream,
116        file: RemoteFile,
117        logger: logging.Logger,
118    ) -> None:
119        parser = stream.get_parser()
120
121        try:
122            record = next(
123                iter(
124                    parser.parse_records(
125                        stream.config, file, self.stream_reader, logger, discovered_schema=None
126                    )
127                )
128            )
129        except StopIteration:
130            # The file is empty. We've verified that we can open it, so will
131            # consider the connection check successful even though it means
132            # we skip the schema validation check.
133            return
134        except AirbyteTracedException as ate:
135            raise ate
136        except Exception as exc:
137            raise CheckAvailabilityError(
138                FileBasedSourceError.ERROR_READING_FILE, stream=stream.name, file=file.uri
139            ) from exc
140
141        schema = stream.catalog_schema or stream.config.input_schema
142        if schema and stream.validation_policy.validate_schema_before_sync:
143            if not conforms_to_schema(record, schema):  # type: ignore
144                raise CheckAvailabilityError(
145                    FileBasedSourceError.ERROR_VALIDATING_RECORD,
146                    stream=stream.name,
147                    file=file.uri,
148                )
149
150        return None

Abstract base class for checking stream availability.

DefaultFileBasedAvailabilityStrategy( stream_reader: airbyte_cdk.sources.file_based.AbstractFileBasedStreamReader)
31    def __init__(self, stream_reader: AbstractFileBasedStreamReader) -> None:
32        self.stream_reader = stream_reader
stream_reader
def check_availability( self, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, logger: logging.Logger, _: Optional[airbyte_cdk.Source]) -> Tuple[bool, Optional[str]]:
34    def check_availability(  # type: ignore[override]  # Signature doesn't match base class
35        self,
36        stream: AbstractFileBasedStream,
37        logger: logging.Logger,
38        _: Optional[Source],
39    ) -> Tuple[bool, Optional[str]]:
40        """
41        Perform a connection check for the stream (verify that we can list files from the stream).
42
43        Returns (True, None) if successful, otherwise (False, <error message>).
44        """
45        try:
46            self._check_list_files(stream)
47        except CheckAvailabilityError:
48            return False, "".join(traceback.format_exc())
49
50        return True, None

Perform a connection check for the stream (verify that we can list files from the stream).

Returns (True, None) if successful, otherwise (False, ).

def check_availability_and_parsability( self, stream: airbyte_cdk.sources.file_based.stream.AbstractFileBasedStream, logger: logging.Logger, _: Optional[airbyte_cdk.Source]) -> Tuple[bool, Optional[str]]:
52    def check_availability_and_parsability(
53        self,
54        stream: AbstractFileBasedStream,
55        logger: logging.Logger,
56        _: Optional[Source],
57    ) -> Tuple[bool, Optional[str]]:
58        """
59        Perform a connection check for the stream.
60
61        Returns (True, None) if successful, otherwise (False, <error message>).
62
63        For the stream:
64        - Verify the parser config is valid per check_config method of the parser.
65        - Verify that we can list files from the stream using the configured globs.
66        - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.
67
68        This method will also check that the files and their contents are consistent
69        with the configured options, as follows:
70        - If the files have extensions, verify that they don't disagree with the
71          configured file type.
72        - If the user provided a schema in the config, check that a subset of records in
73          one file conform to the schema via a call to stream.conforms_to_schema(schema).
74        """
75        parser = stream.get_parser()
76        config_check_result, config_check_error_message = parser.check_config(stream.config)
77        if config_check_result is False:
78            return False, config_check_error_message
79        try:
80            file = self._check_list_files(stream)
81            if not parser.parser_max_n_files_for_parsability == 0:
82                self._check_parse_record(stream, file, logger)
83            else:
84                # If the parser is set to not check parsability, we still want to check that we can open the file.
85                handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger)
86                handle.close()
87        except AirbyteTracedException as ate:
88            raise ate
89        except CheckAvailabilityError:
90            return False, "".join(traceback.format_exc())
91
92        return True, None

Perform a connection check for the stream.

Returns (True, None) if successful, otherwise (False, ).

For the stream:

  • Verify the parser config is valid per check_config method of the parser.
  • Verify that we can list files from the stream using the configured globs.
  • Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.

This method will also check that the files and their contents are consistent with the configured options, as follows:

  • If the files have extensions, verify that they don't disagree with the configured file type.
  • If the user provided a schema in the config, check that a subset of records in one file conform to the schema via a call to stream.conforms_to_schema(schema).