airbyte_cdk.sources.file_based.availability_strategy
20class AbstractFileBasedAvailabilityStrategy(AvailabilityStrategy): 21 @abstractmethod 22 def check_availability( # type: ignore[override] # Signature doesn't match base class 23 self, 24 stream: Stream, 25 logger: logging.Logger, 26 source: Optional[Source] = None, 27 ) -> Tuple[bool, Optional[str]]: 28 """ 29 Perform a connection check for the stream. 30 31 Returns (True, None) if successful, otherwise (False, <error message>). 32 """ 33 ... 34 35 @abstractmethod 36 def check_availability_and_parsability( 37 self, 38 stream: AbstractFileBasedStream, 39 logger: logging.Logger, 40 _: Optional[Source], 41 ) -> Tuple[bool, Optional[str]]: 42 """ 43 Performs a connection check for the stream, as well as additional checks that 44 verify that the connection is working as expected. 45 46 Returns (True, None) if successful, otherwise (False, <error message>). 47 """ 48 ...
Abstract base class for checking stream availability.
21 @abstractmethod 22 def check_availability( # type: ignore[override] # Signature doesn't match base class 23 self, 24 stream: Stream, 25 logger: logging.Logger, 26 source: Optional[Source] = None, 27 ) -> Tuple[bool, Optional[str]]: 28 """ 29 Perform a connection check for the stream. 30 31 Returns (True, None) if successful, otherwise (False, <error message>). 32 """ 33 ...
Perform a connection check for the stream.
Returns (True, None) if successful, otherwise (False,
35 @abstractmethod 36 def check_availability_and_parsability( 37 self, 38 stream: AbstractFileBasedStream, 39 logger: logging.Logger, 40 _: Optional[Source], 41 ) -> Tuple[bool, Optional[str]]: 42 """ 43 Performs a connection check for the stream, as well as additional checks that 44 verify that the connection is working as expected. 45 46 Returns (True, None) if successful, otherwise (False, <error message>). 47 """ 48 ...
Performs a connection check for the stream, as well as additional checks that verify that the connection is working as expected.
Returns (True, None) if successful, otherwise (False,
30class DefaultFileBasedAvailabilityStrategy(AbstractFileBasedAvailabilityStrategy): 31 def __init__(self, stream_reader: AbstractFileBasedStreamReader) -> None: 32 self.stream_reader = stream_reader 33 34 def check_availability( # type: ignore[override] # Signature doesn't match base class 35 self, 36 stream: AbstractFileBasedStream, 37 logger: logging.Logger, 38 _: Optional[Source], 39 ) -> Tuple[bool, Optional[str]]: 40 """ 41 Perform a connection check for the stream (verify that we can list files from the stream). 42 43 Returns (True, None) if successful, otherwise (False, <error message>). 44 """ 45 try: 46 self._check_list_files(stream) 47 except CheckAvailabilityError: 48 return False, "".join(traceback.format_exc()) 49 50 return True, None 51 52 def check_availability_and_parsability( 53 self, 54 stream: AbstractFileBasedStream, 55 logger: logging.Logger, 56 _: Optional[Source], 57 ) -> Tuple[bool, Optional[str]]: 58 """ 59 Perform a connection check for the stream. 60 61 Returns (True, None) if successful, otherwise (False, <error message>). 62 63 For the stream: 64 - Verify the parser config is valid per check_config method of the parser. 65 - Verify that we can list files from the stream using the configured globs. 66 - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0. 67 68 This method will also check that the files and their contents are consistent 69 with the configured options, as follows: 70 - If the files have extensions, verify that they don't disagree with the 71 configured file type. 72 - If the user provided a schema in the config, check that a subset of records in 73 one file conform to the schema via a call to stream.conforms_to_schema(schema). 74 """ 75 parser = stream.get_parser() 76 config_check_result, config_check_error_message = parser.check_config(stream.config) 77 if config_check_result is False: 78 return False, config_check_error_message 79 try: 80 file = self._check_list_files(stream) 81 if not parser.parser_max_n_files_for_parsability == 0: 82 self._check_parse_record(stream, file, logger) 83 else: 84 # If the parser is set to not check parsability, we still want to check that we can open the file. 85 handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger) 86 handle.close() 87 except AirbyteTracedException as ate: 88 raise ate 89 except CheckAvailabilityError: 90 return False, "".join(traceback.format_exc()) 91 92 return True, None 93 94 def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile: 95 """ 96 Check that we can list files from the stream. 97 98 Returns the first file if successful, otherwise raises a CheckAvailabilityError. 99 """ 100 try: 101 file = next(iter(stream.get_files())) 102 except StopIteration: 103 raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name) 104 except CustomFileBasedException as exc: 105 raise CheckAvailabilityError(str(exc), stream=stream.name) from exc 106 except Exception as exc: 107 raise CheckAvailabilityError( 108 FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name 109 ) from exc 110 111 return file 112 113 def _check_parse_record( 114 self, 115 stream: AbstractFileBasedStream, 116 file: RemoteFile, 117 logger: logging.Logger, 118 ) -> None: 119 parser = stream.get_parser() 120 121 try: 122 record = next( 123 iter( 124 parser.parse_records( 125 stream.config, file, self.stream_reader, logger, discovered_schema=None 126 ) 127 ) 128 ) 129 except StopIteration: 130 # The file is empty. We've verified that we can open it, so will 131 # consider the connection check successful even though it means 132 # we skip the schema validation check. 133 return 134 except AirbyteTracedException as ate: 135 raise ate 136 except Exception as exc: 137 raise CheckAvailabilityError( 138 FileBasedSourceError.ERROR_READING_FILE, stream=stream.name, file=file.uri 139 ) from exc 140 141 schema = stream.catalog_schema or stream.config.input_schema 142 if schema and stream.validation_policy.validate_schema_before_sync: 143 if not conforms_to_schema(record, schema): # type: ignore 144 raise CheckAvailabilityError( 145 FileBasedSourceError.ERROR_VALIDATING_RECORD, 146 stream=stream.name, 147 file=file.uri, 148 ) 149 150 return None
Abstract base class for checking stream availability.
34 def check_availability( # type: ignore[override] # Signature doesn't match base class 35 self, 36 stream: AbstractFileBasedStream, 37 logger: logging.Logger, 38 _: Optional[Source], 39 ) -> Tuple[bool, Optional[str]]: 40 """ 41 Perform a connection check for the stream (verify that we can list files from the stream). 42 43 Returns (True, None) if successful, otherwise (False, <error message>). 44 """ 45 try: 46 self._check_list_files(stream) 47 except CheckAvailabilityError: 48 return False, "".join(traceback.format_exc()) 49 50 return True, None
Perform a connection check for the stream (verify that we can list files from the stream).
Returns (True, None) if successful, otherwise (False,
52 def check_availability_and_parsability( 53 self, 54 stream: AbstractFileBasedStream, 55 logger: logging.Logger, 56 _: Optional[Source], 57 ) -> Tuple[bool, Optional[str]]: 58 """ 59 Perform a connection check for the stream. 60 61 Returns (True, None) if successful, otherwise (False, <error message>). 62 63 For the stream: 64 - Verify the parser config is valid per check_config method of the parser. 65 - Verify that we can list files from the stream using the configured globs. 66 - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0. 67 68 This method will also check that the files and their contents are consistent 69 with the configured options, as follows: 70 - If the files have extensions, verify that they don't disagree with the 71 configured file type. 72 - If the user provided a schema in the config, check that a subset of records in 73 one file conform to the schema via a call to stream.conforms_to_schema(schema). 74 """ 75 parser = stream.get_parser() 76 config_check_result, config_check_error_message = parser.check_config(stream.config) 77 if config_check_result is False: 78 return False, config_check_error_message 79 try: 80 file = self._check_list_files(stream) 81 if not parser.parser_max_n_files_for_parsability == 0: 82 self._check_parse_record(stream, file, logger) 83 else: 84 # If the parser is set to not check parsability, we still want to check that we can open the file. 85 handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger) 86 handle.close() 87 except AirbyteTracedException as ate: 88 raise ate 89 except CheckAvailabilityError: 90 return False, "".join(traceback.format_exc()) 91 92 return True, None
Perform a connection check for the stream.
Returns (True, None) if successful, otherwise (False,
For the stream:
- Verify the parser config is valid per check_config method of the parser.
- Verify that we can list files from the stream using the configured globs.
- Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.
This method will also check that the files and their contents are consistent with the configured options, as follows:
- If the files have extensions, verify that they don't disagree with the configured file type.
- If the user provided a schema in the config, check that a subset of records in one file conform to the schema via a call to stream.conforms_to_schema(schema).