airbyte_cdk.sources.file_based.availability_strategy
1from .abstract_file_based_availability_strategy import ( 2 AbstractFileBasedAvailabilityStrategy, 3 AbstractFileBasedAvailabilityStrategyWrapper, 4) 5from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy 6 7__all__ = [ 8 "AbstractFileBasedAvailabilityStrategy", 9 "AbstractFileBasedAvailabilityStrategyWrapper", 10 "DefaultFileBasedAvailabilityStrategy", 11]
26class AbstractFileBasedAvailabilityStrategy(AvailabilityStrategy): 27 @abstractmethod 28 def check_availability( # type: ignore[override] # Signature doesn't match base class 29 self, 30 stream: Stream, 31 logger: logging.Logger, 32 _: Optional[Source], 33 ) -> Tuple[bool, Optional[str]]: 34 """ 35 Perform a connection check for the stream. 36 37 Returns (True, None) if successful, otherwise (False, <error message>). 38 """ 39 ... 40 41 @abstractmethod 42 def check_availability_and_parsability( 43 self, 44 stream: AbstractFileBasedStream, 45 logger: logging.Logger, 46 _: Optional[Source], 47 ) -> Tuple[bool, Optional[str]]: 48 """ 49 Performs a connection check for the stream, as well as additional checks that 50 verify that the connection is working as expected. 51 52 Returns (True, None) if successful, otherwise (False, <error message>). 53 """ 54 ...
Abstract base class for checking stream availability.
27 @abstractmethod 28 def check_availability( # type: ignore[override] # Signature doesn't match base class 29 self, 30 stream: Stream, 31 logger: logging.Logger, 32 _: Optional[Source], 33 ) -> Tuple[bool, Optional[str]]: 34 """ 35 Perform a connection check for the stream. 36 37 Returns (True, None) if successful, otherwise (False, <error message>). 38 """ 39 ...
Perform a connection check for the stream.
Returns (True, None) if successful, otherwise (False,
41 @abstractmethod 42 def check_availability_and_parsability( 43 self, 44 stream: AbstractFileBasedStream, 45 logger: logging.Logger, 46 _: Optional[Source], 47 ) -> Tuple[bool, Optional[str]]: 48 """ 49 Performs a connection check for the stream, as well as additional checks that 50 verify that the connection is working as expected. 51 52 Returns (True, None) if successful, otherwise (False, <error message>). 53 """ 54 ...
Performs a connection check for the stream, as well as additional checks that verify that the connection is working as expected.
Returns (True, None) if successful, otherwise (False,
57class AbstractFileBasedAvailabilityStrategyWrapper(AbstractAvailabilityStrategy): 58 def __init__(self, stream: AbstractFileBasedStream) -> None: 59 self.stream = stream 60 61 def check_availability(self, logger: logging.Logger) -> StreamAvailability: 62 is_available, reason = self.stream.availability_strategy.check_availability( 63 self.stream, logger, None 64 ) 65 if is_available: 66 return StreamAvailable() 67 return StreamUnavailable(reason or "") 68 69 def check_availability_and_parsability( 70 self, logger: logging.Logger 71 ) -> Tuple[bool, Optional[str]]: 72 return self.stream.availability_strategy.check_availability_and_parsability( 73 self.stream, logger, None 74 )
AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK. This interface is not yet stable and may change in the future. Use at your own risk.
Why create a new interface instead of using the existing AvailabilityStrategy? The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream.
61 def check_availability(self, logger: logging.Logger) -> StreamAvailability: 62 is_available, reason = self.stream.availability_strategy.check_availability( 63 self.stream, logger, None 64 ) 65 if is_available: 66 return StreamAvailable() 67 return StreamUnavailable(reason or "")
Checks stream availability.
Parameters
- logger: logger object to use
Returns
A StreamAvailability object describing the stream's availability
30class DefaultFileBasedAvailabilityStrategy(AbstractFileBasedAvailabilityStrategy): 31 def __init__(self, stream_reader: AbstractFileBasedStreamReader) -> None: 32 self.stream_reader = stream_reader 33 34 def check_availability( # type: ignore[override] # Signature doesn't match base class 35 self, 36 stream: AbstractFileBasedStream, 37 logger: logging.Logger, 38 _: Optional[Source], 39 ) -> Tuple[bool, Optional[str]]: 40 """ 41 Perform a connection check for the stream (verify that we can list files from the stream). 42 43 Returns (True, None) if successful, otherwise (False, <error message>). 44 """ 45 try: 46 self._check_list_files(stream) 47 except CheckAvailabilityError: 48 return False, "".join(traceback.format_exc()) 49 50 return True, None 51 52 def check_availability_and_parsability( 53 self, 54 stream: AbstractFileBasedStream, 55 logger: logging.Logger, 56 _: Optional[Source], 57 ) -> Tuple[bool, Optional[str]]: 58 """ 59 Perform a connection check for the stream. 60 61 Returns (True, None) if successful, otherwise (False, <error message>). 62 63 For the stream: 64 - Verify the parser config is valid per check_config method of the parser. 65 - Verify that we can list files from the stream using the configured globs. 66 - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0. 67 68 This method will also check that the files and their contents are consistent 69 with the configured options, as follows: 70 - If the files have extensions, verify that they don't disagree with the 71 configured file type. 72 - If the user provided a schema in the config, check that a subset of records in 73 one file conform to the schema via a call to stream.conforms_to_schema(schema). 74 """ 75 parser = stream.get_parser() 76 config_check_result, config_check_error_message = parser.check_config(stream.config) 77 if config_check_result is False: 78 return False, config_check_error_message 79 try: 80 file = self._check_list_files(stream) 81 if not parser.parser_max_n_files_for_parsability == 0: 82 self._check_parse_record(stream, file, logger) 83 else: 84 # If the parser is set to not check parsability, we still want to check that we can open the file. 85 handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger) 86 handle.close() 87 except AirbyteTracedException as ate: 88 raise ate 89 except CheckAvailabilityError: 90 return False, "".join(traceback.format_exc()) 91 92 return True, None 93 94 def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile: 95 """ 96 Check that we can list files from the stream. 97 98 Returns the first file if successful, otherwise raises a CheckAvailabilityError. 99 """ 100 try: 101 file = next(iter(stream.get_files())) 102 except StopIteration: 103 raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name) 104 except CustomFileBasedException as exc: 105 raise CheckAvailabilityError(str(exc), stream=stream.name) from exc 106 except Exception as exc: 107 raise CheckAvailabilityError( 108 FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name 109 ) from exc 110 111 return file 112 113 def _check_parse_record( 114 self, 115 stream: AbstractFileBasedStream, 116 file: RemoteFile, 117 logger: logging.Logger, 118 ) -> None: 119 parser = stream.get_parser() 120 121 try: 122 record = next( 123 iter( 124 parser.parse_records( 125 stream.config, file, self.stream_reader, logger, discovered_schema=None 126 ) 127 ) 128 ) 129 except StopIteration: 130 # The file is empty. We've verified that we can open it, so will 131 # consider the connection check successful even though it means 132 # we skip the schema validation check. 133 return 134 except AirbyteTracedException as ate: 135 raise ate 136 except Exception as exc: 137 raise CheckAvailabilityError( 138 FileBasedSourceError.ERROR_READING_FILE, stream=stream.name, file=file.uri 139 ) from exc 140 141 schema = stream.catalog_schema or stream.config.input_schema 142 if schema and stream.validation_policy.validate_schema_before_sync: 143 if not conforms_to_schema(record, schema): # type: ignore 144 raise CheckAvailabilityError( 145 FileBasedSourceError.ERROR_VALIDATING_RECORD, 146 stream=stream.name, 147 file=file.uri, 148 ) 149 150 return None
Abstract base class for checking stream availability.
34 def check_availability( # type: ignore[override] # Signature doesn't match base class 35 self, 36 stream: AbstractFileBasedStream, 37 logger: logging.Logger, 38 _: Optional[Source], 39 ) -> Tuple[bool, Optional[str]]: 40 """ 41 Perform a connection check for the stream (verify that we can list files from the stream). 42 43 Returns (True, None) if successful, otherwise (False, <error message>). 44 """ 45 try: 46 self._check_list_files(stream) 47 except CheckAvailabilityError: 48 return False, "".join(traceback.format_exc()) 49 50 return True, None
Perform a connection check for the stream (verify that we can list files from the stream).
Returns (True, None) if successful, otherwise (False,
52 def check_availability_and_parsability( 53 self, 54 stream: AbstractFileBasedStream, 55 logger: logging.Logger, 56 _: Optional[Source], 57 ) -> Tuple[bool, Optional[str]]: 58 """ 59 Perform a connection check for the stream. 60 61 Returns (True, None) if successful, otherwise (False, <error message>). 62 63 For the stream: 64 - Verify the parser config is valid per check_config method of the parser. 65 - Verify that we can list files from the stream using the configured globs. 66 - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0. 67 68 This method will also check that the files and their contents are consistent 69 with the configured options, as follows: 70 - If the files have extensions, verify that they don't disagree with the 71 configured file type. 72 - If the user provided a schema in the config, check that a subset of records in 73 one file conform to the schema via a call to stream.conforms_to_schema(schema). 74 """ 75 parser = stream.get_parser() 76 config_check_result, config_check_error_message = parser.check_config(stream.config) 77 if config_check_result is False: 78 return False, config_check_error_message 79 try: 80 file = self._check_list_files(stream) 81 if not parser.parser_max_n_files_for_parsability == 0: 82 self._check_parse_record(stream, file, logger) 83 else: 84 # If the parser is set to not check parsability, we still want to check that we can open the file. 85 handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger) 86 handle.close() 87 except AirbyteTracedException as ate: 88 raise ate 89 except CheckAvailabilityError: 90 return False, "".join(traceback.format_exc()) 91 92 return True, None
Perform a connection check for the stream.
Returns (True, None) if successful, otherwise (False,
For the stream:
- Verify the parser config is valid per check_config method of the parser.
- Verify that we can list files from the stream using the configured globs.
- Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.
This method will also check that the files and their contents are consistent with the configured options, as follows:
- If the files have extensions, verify that they don't disagree with the configured file type.
- If the user provided a schema in the config, check that a subset of records in one file conform to the schema via a call to stream.conforms_to_schema(schema).