airbyte_cdk.sources.file_based.availability_strategy
20class AbstractFileBasedAvailabilityStrategy(AvailabilityStrategy): 21 @abstractmethod 22 def check_availability( # type: ignore[override] # Signature doesn't match base class 23 self, 24 stream: Stream, 25 logger: logging.Logger, 26 source: Optional[Source] = None, 27 ) -> Tuple[bool, Optional[str]]: 28 """ 29 Perform a connection check for the stream. 30 31 Returns (True, None) if successful, otherwise (False, <error message>). 32 """ 33 ... 34 35 @abstractmethod 36 def check_availability_and_parsability( 37 self, 38 stream: AbstractFileBasedStream, 39 logger: logging.Logger, 40 _: Optional[Source], 41 ) -> Tuple[bool, Optional[str]]: 42 """ 43 Performs a connection check for the stream, as well as additional checks that 44 verify that the connection is working as expected. 45 46 Returns (True, None) if successful, otherwise (False, <error message>). 47 """ 48 ...
Abstract base class for checking stream availability.
21 @abstractmethod 22 def check_availability( # type: ignore[override] # Signature doesn't match base class 23 self, 24 stream: Stream, 25 logger: logging.Logger, 26 source: Optional[Source] = None, 27 ) -> Tuple[bool, Optional[str]]: 28 """ 29 Perform a connection check for the stream. 30 31 Returns (True, None) if successful, otherwise (False, <error message>). 32 """ 33 ...
Perform a connection check for the stream.
Returns (True, None) if successful, otherwise (False,
35 @abstractmethod 36 def check_availability_and_parsability( 37 self, 38 stream: AbstractFileBasedStream, 39 logger: logging.Logger, 40 _: Optional[Source], 41 ) -> Tuple[bool, Optional[str]]: 42 """ 43 Performs a connection check for the stream, as well as additional checks that 44 verify that the connection is working as expected. 45 46 Returns (True, None) if successful, otherwise (False, <error message>). 47 """ 48 ...
Performs a connection check for the stream, as well as additional checks that verify that the connection is working as expected.
Returns (True, None) if successful, otherwise (False,
29class DefaultFileBasedAvailabilityStrategy(AbstractFileBasedAvailabilityStrategy): 30 def __init__(self, stream_reader: AbstractFileBasedStreamReader) -> None: 31 self.stream_reader = stream_reader 32 33 def check_availability( # type: ignore[override] # Signature doesn't match base class 34 self, 35 stream: AbstractFileBasedStream, 36 logger: logging.Logger, 37 _: Optional[Source], 38 ) -> Tuple[bool, Optional[str]]: 39 """ 40 Perform a connection check for the stream (verify that we can list files from the stream). 41 42 Returns `(True, None)` if successful, otherwise `(False, <error message>)`. 43 44 The returned reason is the actionable error string from the underlying 45 `CheckAvailabilityError`. Connector-raised `AirbyteTracedException`s are 46 re-raised so that their actionable `message` propagates to the platform 47 unchanged. The full traceback is still logged for debugging. 48 """ 49 try: 50 self._check_list_files(stream) 51 except CheckAvailabilityError as exc: 52 logger.exception("Stream availability check failed") 53 return False, str(exc) 54 55 return True, None 56 57 def check_availability_and_parsability( 58 self, 59 stream: AbstractFileBasedStream, 60 logger: logging.Logger, 61 _: Optional[Source], 62 ) -> Tuple[bool, Optional[str]]: 63 """ 64 Perform a connection check for the stream. 65 66 Returns (True, None) if successful, otherwise (False, <error message>). 67 68 For the stream: 69 - Verify the parser config is valid per check_config method of the parser. 70 - Verify that we can list files from the stream using the configured globs. 71 - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0. 72 73 This method will also check that the files and their contents are consistent 74 with the configured options, as follows: 75 - If the files have extensions, verify that they don't disagree with the 76 configured file type. 77 - If the user provided a schema in the config, check that a subset of records in 78 one file conform to the schema via a call to stream.conforms_to_schema(schema). 79 """ 80 parser = stream.get_parser() 81 config_check_result, config_check_error_message = parser.check_config(stream.config) 82 if config_check_result is False: 83 return False, config_check_error_message 84 try: 85 file = self._check_list_files(stream) 86 if not parser.parser_max_n_files_for_parsability == 0: 87 self._check_parse_record(stream, file, logger) 88 else: 89 # If the parser is set to not check parsability, we still want to check that we can open the file. 90 handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger) 91 handle.close() 92 except AirbyteTracedException: 93 raise 94 except CheckAvailabilityError as exc: 95 logger.exception("Stream availability check failed") 96 return False, str(exc) 97 98 return True, None 99 100 def _check_list_files(self, stream: AbstractFileBasedStream) -> RemoteFile: 101 """ 102 Check that we can list files from the stream. 103 104 Returns the first file if successful, otherwise raises a `CheckAvailabilityError`. 105 106 `AirbyteTracedException`s raised by the underlying stream reader are 107 re-raised unchanged so that connector-specific actionable messages 108 (e.g. invalid credentials, missing permissions) reach the platform 109 without being wrapped in a generic `ERROR_LISTING_FILES` reason. 110 """ 111 try: 112 file = next(iter(stream.get_files())) 113 except StopIteration: 114 raise CheckAvailabilityError(FileBasedSourceError.EMPTY_STREAM, stream=stream.name) 115 except CustomFileBasedException as exc: 116 raise CheckAvailabilityError(str(exc), stream=stream.name) from exc 117 except AirbyteTracedException: 118 raise 119 except Exception as exc: 120 raise CheckAvailabilityError( 121 FileBasedSourceError.ERROR_LISTING_FILES, stream=stream.name 122 ) from exc 123 124 return file 125 126 def _check_parse_record( 127 self, 128 stream: AbstractFileBasedStream, 129 file: RemoteFile, 130 logger: logging.Logger, 131 ) -> None: 132 parser = stream.get_parser() 133 134 try: 135 record = next( 136 iter( 137 parser.parse_records( 138 stream.config, file, self.stream_reader, logger, discovered_schema=None 139 ) 140 ) 141 ) 142 except StopIteration: 143 # The file is empty. We've verified that we can open it, so will 144 # consider the connection check successful even though it means 145 # we skip the schema validation check. 146 return 147 except AirbyteTracedException as ate: 148 raise ate 149 except Exception as exc: 150 raise CheckAvailabilityError( 151 FileBasedSourceError.ERROR_READING_FILE, stream=stream.name, file=file.uri 152 ) from exc 153 154 schema = stream.catalog_schema or stream.config.input_schema 155 if schema and stream.validation_policy.validate_schema_before_sync: 156 if not conforms_to_schema(record, schema): # type: ignore 157 raise CheckAvailabilityError( 158 FileBasedSourceError.ERROR_VALIDATING_RECORD, 159 stream=stream.name, 160 file=file.uri, 161 ) 162 163 return None
Abstract base class for checking stream availability.
33 def check_availability( # type: ignore[override] # Signature doesn't match base class 34 self, 35 stream: AbstractFileBasedStream, 36 logger: logging.Logger, 37 _: Optional[Source], 38 ) -> Tuple[bool, Optional[str]]: 39 """ 40 Perform a connection check for the stream (verify that we can list files from the stream). 41 42 Returns `(True, None)` if successful, otherwise `(False, <error message>)`. 43 44 The returned reason is the actionable error string from the underlying 45 `CheckAvailabilityError`. Connector-raised `AirbyteTracedException`s are 46 re-raised so that their actionable `message` propagates to the platform 47 unchanged. The full traceback is still logged for debugging. 48 """ 49 try: 50 self._check_list_files(stream) 51 except CheckAvailabilityError as exc: 52 logger.exception("Stream availability check failed") 53 return False, str(exc) 54 55 return True, None
Perform a connection check for the stream (verify that we can list files from the stream).
Returns (True, None) if successful, otherwise (False, <error message>).
The returned reason is the actionable error string from the underlying
CheckAvailabilityError. Connector-raised AirbyteTracedExceptions are
re-raised so that their actionable message propagates to the platform
unchanged. The full traceback is still logged for debugging.
57 def check_availability_and_parsability( 58 self, 59 stream: AbstractFileBasedStream, 60 logger: logging.Logger, 61 _: Optional[Source], 62 ) -> Tuple[bool, Optional[str]]: 63 """ 64 Perform a connection check for the stream. 65 66 Returns (True, None) if successful, otherwise (False, <error message>). 67 68 For the stream: 69 - Verify the parser config is valid per check_config method of the parser. 70 - Verify that we can list files from the stream using the configured globs. 71 - Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0. 72 73 This method will also check that the files and their contents are consistent 74 with the configured options, as follows: 75 - If the files have extensions, verify that they don't disagree with the 76 configured file type. 77 - If the user provided a schema in the config, check that a subset of records in 78 one file conform to the schema via a call to stream.conforms_to_schema(schema). 79 """ 80 parser = stream.get_parser() 81 config_check_result, config_check_error_message = parser.check_config(stream.config) 82 if config_check_result is False: 83 return False, config_check_error_message 84 try: 85 file = self._check_list_files(stream) 86 if not parser.parser_max_n_files_for_parsability == 0: 87 self._check_parse_record(stream, file, logger) 88 else: 89 # If the parser is set to not check parsability, we still want to check that we can open the file. 90 handle = stream.stream_reader.open_file(file, parser.file_read_mode, None, logger) 91 handle.close() 92 except AirbyteTracedException: 93 raise 94 except CheckAvailabilityError as exc: 95 logger.exception("Stream availability check failed") 96 return False, str(exc) 97 98 return True, None
Perform a connection check for the stream.
Returns (True, None) if successful, otherwise (False,
For the stream:
- Verify the parser config is valid per check_config method of the parser.
- Verify that we can list files from the stream using the configured globs.
- Verify that we can read one file from the stream as long as the stream parser is not setting parser_max_n_files_for_parsability to 0.
This method will also check that the files and their contents are consistent with the configured options, as follows:
- If the files have extensions, verify that they don't disagree with the configured file type.
- If the user provided a schema in the config, check that a subset of records in one file conform to the schema via a call to stream.conforms_to_schema(schema).