airbyte_cdk.connector_builder.test_reader
40class TestReader: 41 """ 42 A utility class for performing test reads from a declarative data source, primarily used to validate 43 connector configurations by performing partial stream reads. 44 45 Initialization: 46 47 TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000) 48 Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records 49 per read operation. 50 51 Public Methods: 52 run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead: 53 54 Executes a test read operation from the given declarative source. It configures and infers the schema, 55 processes the read messages (including logging and error handling), and returns a StreamRead object 56 that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats. 57 58 Parameters: 59 source (DeclarativeSource): The data source to read from. 60 config (Mapping[str, Any]): Configuration parameters for the source. 61 configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration. 62 state (List[AirbyteStateMessage]): Current state information for the read. 63 record_limit (Optional[int]): Optional override for the maximum number of records to read. 64 65 Returns: 66 StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata, 67 along with indicators if any configured limit was reached. 68 69 """ 70 71 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 72 73 logger = logging.getLogger("airbyte.connector-builder") 74 75 def __init__( 76 self, 77 max_pages_per_slice: int, 78 max_slices: int, 79 max_record_limit: int = 1000, 80 ) -> None: 81 self._max_pages_per_slice = max_pages_per_slice 82 self._max_slices = max_slices 83 self._max_record_limit = max_record_limit 84 85 def run_test_read( 86 self, 87 source: DeclarativeSource, 88 config: Mapping[str, Any], 89 configured_catalog: ConfiguredAirbyteCatalog, 90 state: List[AirbyteStateMessage], 91 record_limit: Optional[int] = None, 92 ) -> StreamRead: 93 """ 94 Run a test read for the connector by reading from a single stream and inferring schema and datetime formats. 95 96 Parameters: 97 source (DeclarativeSource): The source instance providing the streams. 98 config (Mapping[str, Any]): The configuration settings to use for reading. 99 configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration. 100 state (List[AirbyteStateMessage]): A list of state messages to resume the read. 101 record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None. 102 103 Returns: 104 StreamRead: An object containing the following attributes: 105 - logs (List[str]): Log messages generated during the process. 106 - slices (List[Any]): The data slices read from the stream. 107 - test_read_limit_reached (bool): Indicates whether the record limit was reached. 108 - auxiliary_requests (Any): Any auxiliary requests generated during reading. 109 - inferred_schema (Any): The schema inferred from the stream data. 110 - latest_config_update (Any): The latest configuration update, if applicable. 111 - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats. 112 """ 113 114 record_limit = self._check_record_limit(record_limit) 115 # The connector builder currently only supports reading from a single stream at a time 116 stream = source.streams(config)[0] 117 118 # get any deprecation warnings during the component creation 119 deprecation_warnings: List[LogMessage] = source.deprecation_warnings() 120 121 schema_inferrer = SchemaInferrer( 122 self._pk_to_nested_and_composite_field(stream.primary_key), 123 self._cursor_field_to_nested_and_composite_field(stream.cursor_field), 124 ) 125 datetime_format_inferrer = DatetimeFormatInferrer() 126 127 message_group = get_message_groups( 128 self._read_stream(source, config, configured_catalog, state), 129 schema_inferrer, 130 datetime_format_inferrer, 131 record_limit, 132 ) 133 134 slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups( 135 message_group 136 ) 137 138 # extend log messages with deprecation warnings 139 log_messages.extend(deprecation_warnings) 140 141 schema, log_messages = self._get_infered_schema( 142 configured_catalog, schema_inferrer, log_messages 143 ) 144 145 return StreamRead( 146 logs=log_messages, 147 slices=slices, 148 test_read_limit_reached=self._has_reached_limit(slices), 149 auxiliary_requests=auxiliary_requests, 150 inferred_schema=schema, 151 latest_config_update=self._get_latest_config_update(latest_config_update), 152 inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(), 153 ) 154 155 def _pk_to_nested_and_composite_field( 156 self, field: Optional[Union[str, List[str], List[List[str]]]] 157 ) -> List[List[str]]: 158 """ 159 Converts a primary key definition into a nested list representation. 160 161 The function accepts a primary key that can be a single string, a list of strings, or a list of lists of strings. 162 It ensures that the return value is always a list of lists of strings. 163 164 Parameters: 165 field (Optional[Union[str, List[str], List[List[str]]]]): 166 The primary key definition. This can be: 167 - None or an empty value: returns a list containing an empty list. 168 - A single string: returns a list containing one list with the string. 169 - A list of strings (composite key): returns a list where each key is encapsulated in its own list. 170 - A list of lists of strings (nested field structure): returns as is. 171 172 Returns: 173 List[List[str]]: 174 A nested list representation of the primary key. 175 """ 176 if not field: 177 return [[]] 178 179 if isinstance(field, str): 180 return [[field]] 181 182 is_composite_key = isinstance(field[0], str) 183 if is_composite_key: 184 return [[i] for i in field] # type: ignore # the type of field is expected to be List[str] here 185 186 return field # type: ignore # the type of field is expected to be List[List[str]] here 187 188 def _cursor_field_to_nested_and_composite_field( 189 self, field: Union[str, List[str]] 190 ) -> List[List[str]]: 191 """ 192 Transforms the cursor field input into a nested list format suitable for further processing. 193 194 This function accepts a cursor field specification, which can be either: 195 - A falsy value (e.g., None or an empty string), in which case it returns a list containing an empty list. 196 - A string representing a simple cursor field. The string is wrapped in a nested list. 197 - A list of strings representing a composite or nested cursor field. The list is returned wrapped in an outer list. 198 199 Parameters: 200 field (Union[str, List[str]]): The cursor field specification. It can be: 201 - An empty or falsy value: returns [[]]. 202 - A string: returns [[field]]. 203 - A list of strings: returns [field] if the first element is a string. 204 205 Returns: 206 List[List[str]]: A nested list representation of the cursor field. 207 208 Raises: 209 ValueError: If the input is a list but its first element is not a string, 210 indicating an unsupported type for a cursor field. 211 """ 212 if not field: 213 return [[]] 214 215 if isinstance(field, str): 216 return [[field]] 217 218 is_nested_key = isinstance(field[0], str) 219 if is_nested_key: 220 return [field] 221 222 raise ValueError(f"Unknown type for cursor field `{field}") 223 224 def _check_record_limit(self, record_limit: Optional[int] = None) -> int: 225 """ 226 Checks and adjusts the provided record limit to ensure it falls within the valid range. 227 228 If record_limit is provided, it must be between 1 and self._max_record_limit inclusive. 229 If record_limit is None, it defaults to self._max_record_limit. 230 231 Args: 232 record_limit (Optional[int]): The requested record limit to validate. 233 234 Returns: 235 int: The validated record limit. If record_limit exceeds self._max_record_limit, the maximum allowed value is used. 236 237 Raises: 238 ValueError: If record_limit is provided and is not between 1 and self._max_record_limit. 239 """ 240 if record_limit is not None and not (1 <= record_limit <= self._max_record_limit): 241 raise ValueError( 242 f"Record limit must be between 1 and {self._max_record_limit}. Got {record_limit}" 243 ) 244 245 if record_limit is None: 246 record_limit = self._max_record_limit 247 else: 248 record_limit = min(record_limit, self._max_record_limit) 249 250 return record_limit 251 252 def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES: 253 """ 254 Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update. 255 256 This function iterates over each message group in the provided collection and processes it based on its type: 257 - AirbyteLogMessage: Converts the log message into a LogMessage object and appends it to the log_messages list. 258 - AirbyteTraceMessage with type ERROR: Extracts error details, creates a LogMessage at the "ERROR" level, and appends it. 259 - AirbyteControlMessage: Updates the latest_config_update if the current message is more recent. 260 - AuxiliaryRequest: Appends the message to the auxiliary_requests list. 261 - StreamReadSlices: Appends the message to the slices list. 262 - Any other type: Raises a ValueError indicating an unknown message group type. 263 264 Parameters: 265 message_groups (MESSAGE_GROUPS): A collection of message groups to be processed. 266 267 Returns: 268 GROUPED_MESSAGES: A tuple containing four elements: 269 - slices: A list of StreamReadSlices objects. 270 - log_messages: A list of LogMessage objects. 271 - auxiliary_requests: A list of AuxiliaryRequest objects. 272 - latest_config_update: The most recent AirbyteControlMessage, or None if none was processed. 273 274 Raises: 275 ValueError: If a message group of an unknown type is encountered. 276 """ 277 278 slices = [] 279 log_messages = [] 280 auxiliary_requests = [] 281 latest_config_update: Optional[AirbyteControlMessage] = None 282 283 # process the message groups first 284 for message_group in message_groups: 285 match message_group: 286 case AirbyteLogMessage(): 287 log_messages.append( 288 LogMessage(message=message_group.message, level=message_group.level.value) 289 ) 290 case AirbyteTraceMessage(): 291 if message_group.type == TraceType.ERROR: 292 log_messages.append( 293 LogMessage( 294 message=message_group.error.message, # type: ignore 295 level="ERROR", 296 internal_message=message_group.error.internal_message, # type: ignore 297 stacktrace=message_group.error.stack_trace, # type: ignore 298 ) 299 ) 300 case AirbyteControlMessage(): 301 if ( 302 not latest_config_update 303 or latest_config_update.emitted_at <= message_group.emitted_at 304 ): 305 latest_config_update = message_group 306 case AuxiliaryRequest(): 307 auxiliary_requests.append(message_group) 308 case StreamReadSlices(): 309 slices.append(message_group) 310 case _: 311 raise ValueError(f"Unknown message group type: {type(message_group)}") 312 313 return slices, log_messages, auxiliary_requests, latest_config_update 314 315 def _get_infered_schema( 316 self, 317 configured_catalog: ConfiguredAirbyteCatalog, 318 schema_inferrer: SchemaInferrer, 319 log_messages: List[LogMessage], 320 ) -> INFERRED_SCHEMA_OUTPUT_TYPE: 321 """ 322 Retrieves the inferred schema from the given configured catalog using the provided schema inferrer. 323 324 This function processes a single stream from the configured catalog. It attempts to obtain the stream's 325 schema via the schema inferrer. If a SchemaValidationException occurs, each validation error is logged in the 326 provided log_messages list and the partially inferred schema (from the exception) is returned. 327 328 Parameters: 329 configured_catalog (ConfiguredAirbyteCatalog): The configured catalog that contains the stream metadata. 330 It is assumed that only one stream is defined. 331 schema_inferrer (SchemaInferrer): An instance responsible for inferring the schema for a given stream. 332 log_messages (List[LogMessage]): A list that will be appended with log messages, especially error messages 333 if schema validation issues arise. 334 335 Returns: 336 INFERRED_SCHEMA_OUTPUT_TYPE: A tuple consisting of the inferred schema and the updated list of log messages. 337 """ 338 339 try: 340 # The connector builder currently only supports reading from a single stream at a time 341 configured_stream = configured_catalog.streams[0] 342 schema = schema_inferrer.get_stream_schema(configured_stream.stream.name) 343 except SchemaValidationException as exception: 344 # we update the log_messages with possible errors 345 for validation_error in exception.validation_errors: 346 log_messages.append(LogMessage(validation_error, "ERROR")) 347 schema = exception.schema 348 349 return schema, log_messages 350 351 def _get_latest_config_update( 352 self, 353 latest_config_update: AirbyteControlMessage | None, 354 ) -> Dict[str, Any] | None: 355 """ 356 Retrieves a cleaned configuration from the latest Airbyte control message. 357 358 This helper function extracts the configuration from the given Airbyte control message, cleans it using the internal `Parsers.clean_config` function, 359 and returns the resulting dictionary. If no control message is provided (i.e., latest_config_update is None), the function returns None. 360 361 Parameters: 362 latest_config_update (AirbyteControlMessage | None): The control message containing the connector configuration. May be None. 363 364 Returns: 365 Dict[str, Any] | None: The cleaned configuration dictionary if available; otherwise, None. 366 """ 367 368 return ( 369 clean_config(latest_config_update.connectorConfig.config) # type: ignore 370 if latest_config_update 371 else None 372 ) 373 374 def _read_stream( 375 self, 376 source: DeclarativeSource, 377 config: Mapping[str, Any], 378 configured_catalog: ConfiguredAirbyteCatalog, 379 state: List[AirbyteStateMessage], 380 ) -> Iterator[AirbyteMessage]: 381 """ 382 Reads messages from the given DeclarativeSource using an AirbyteEntrypoint. 383 384 This method attempts to yield messages from the source's read generator. If the generator 385 raises an AirbyteTracedException, it checks whether the exception message indicates a non-actionable 386 error (e.g., a final exception from AbstractSource that should not be logged). In that case, it stops 387 processing without yielding the exception as a message. For other exceptions, the exception is caught, 388 wrapped into an AirbyteTracedException, and yielded as an AirbyteMessage. 389 390 Parameters: 391 source (DeclarativeSource): The source object that provides data reading logic. 392 config (Mapping[str, Any]): The configuration dictionary for the source. 393 configured_catalog (ConfiguredAirbyteCatalog): The catalog defining the streams and their configurations. 394 state (List[AirbyteStateMessage]): A list representing the current state for incremental sync. 395 396 Yields: 397 AirbyteMessage: Messages yielded from the source's generator. In case of exceptions, 398 an AirbyteMessage encapsulating the error is yielded instead. 399 """ 400 # the generator can raise an exception 401 # iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage 402 try: 403 yield from AirbyteEntrypoint(source).read( 404 source.spec(self.logger), config, configured_catalog, state 405 ) 406 except AirbyteTracedException as traced_exception: 407 # Look for this message which indicates that it is the "final exception" raised by AbstractSource. 408 # If it matches, don't yield this as we don't need to show this in the Builder. 409 # This is somewhat brittle as it relies on the message string, but if they drift then the worst case 410 # is that this message will be shown in the Builder. 411 if ( 412 traced_exception.message is not None 413 and "During the sync, the following streams did not sync successfully" 414 in traced_exception.message 415 ): 416 return 417 yield traced_exception.as_airbyte_message() 418 except Exception as e: 419 error_message = f"{e.args[0] if len(e.args) > 0 else str(e)}" 420 yield AirbyteTracedException.from_exception( 421 e, message=error_message 422 ).as_airbyte_message() 423 424 def _has_reached_limit(self, slices: List[StreamReadSlices]) -> bool: 425 """ 426 Determines whether the provided collection of slices has reached any defined limits. 427 428 This function checks for three types of limits: 429 1. If the number of slices is greater than or equal to a maximum slice limit. 430 2. If any individual slice has a number of pages that meets or exceeds a maximum number of pages per slice. 431 3. If the cumulative number of records across all pages in all slices reaches or exceeds a maximum record limit. 432 433 Parameters: 434 slices (List[StreamReadSlices]): A list where each element represents a slice containing one or more pages, and each page has a collection of records. 435 436 Returns: 437 bool: True if any of the following conditions is met: 438 - The number of slices is at or above the maximum allowed slices. 439 - Any slice contains pages at or above the maximum allowed per slice. 440 - The total count of records reaches or exceeds the maximum record limit. 441 False otherwise. 442 """ 443 if len(slices) >= self._max_slices: 444 return True 445 446 record_count = 0 447 448 for _slice in slices: 449 if len(_slice.pages) >= self._max_pages_per_slice: 450 return True 451 for page in _slice.pages: 452 record_count += len(page.records) 453 if record_count >= self._max_record_limit: 454 return True 455 return False
A utility class for performing test reads from a declarative data source, primarily used to validate connector configurations by performing partial stream reads.
Initialization:
TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000) Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records per read operation.
Public Methods:
run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead:
Executes a test read operation from the given declarative source. It configures and infers the schema, processes the read messages (including logging and error handling), and returns a StreamRead object that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats. Parameters: source (DeclarativeSource): The data source to read from. config (Mapping[str, Any]): Configuration parameters for the source. configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration. state (List[AirbyteStateMessage]): Current state information for the read. record_limit (Optional[int]): Optional override for the maximum number of records to read. Returns: StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata, along with indicators if any configured limit was reached.
85 def run_test_read( 86 self, 87 source: DeclarativeSource, 88 config: Mapping[str, Any], 89 configured_catalog: ConfiguredAirbyteCatalog, 90 state: List[AirbyteStateMessage], 91 record_limit: Optional[int] = None, 92 ) -> StreamRead: 93 """ 94 Run a test read for the connector by reading from a single stream and inferring schema and datetime formats. 95 96 Parameters: 97 source (DeclarativeSource): The source instance providing the streams. 98 config (Mapping[str, Any]): The configuration settings to use for reading. 99 configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration. 100 state (List[AirbyteStateMessage]): A list of state messages to resume the read. 101 record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None. 102 103 Returns: 104 StreamRead: An object containing the following attributes: 105 - logs (List[str]): Log messages generated during the process. 106 - slices (List[Any]): The data slices read from the stream. 107 - test_read_limit_reached (bool): Indicates whether the record limit was reached. 108 - auxiliary_requests (Any): Any auxiliary requests generated during reading. 109 - inferred_schema (Any): The schema inferred from the stream data. 110 - latest_config_update (Any): The latest configuration update, if applicable. 111 - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats. 112 """ 113 114 record_limit = self._check_record_limit(record_limit) 115 # The connector builder currently only supports reading from a single stream at a time 116 stream = source.streams(config)[0] 117 118 # get any deprecation warnings during the component creation 119 deprecation_warnings: List[LogMessage] = source.deprecation_warnings() 120 121 schema_inferrer = SchemaInferrer( 122 self._pk_to_nested_and_composite_field(stream.primary_key), 123 self._cursor_field_to_nested_and_composite_field(stream.cursor_field), 124 ) 125 datetime_format_inferrer = DatetimeFormatInferrer() 126 127 message_group = get_message_groups( 128 self._read_stream(source, config, configured_catalog, state), 129 schema_inferrer, 130 datetime_format_inferrer, 131 record_limit, 132 ) 133 134 slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups( 135 message_group 136 ) 137 138 # extend log messages with deprecation warnings 139 log_messages.extend(deprecation_warnings) 140 141 schema, log_messages = self._get_infered_schema( 142 configured_catalog, schema_inferrer, log_messages 143 ) 144 145 return StreamRead( 146 logs=log_messages, 147 slices=slices, 148 test_read_limit_reached=self._has_reached_limit(slices), 149 auxiliary_requests=auxiliary_requests, 150 inferred_schema=schema, 151 latest_config_update=self._get_latest_config_update(latest_config_update), 152 inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(), 153 )
Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.
Arguments:
- source (DeclarativeSource): The source instance providing the streams.
- config (Mapping[str, Any]): The configuration settings to use for reading.
- configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
- state (List[AirbyteStateMessage]): A list of state messages to resume the read.
- record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
Returns:
StreamRead: An object containing the following attributes: - logs (List[str]): Log messages generated during the process. - slices (List[Any]): The data slices read from the stream. - test_read_limit_reached (bool): Indicates whether the record limit was reached. - auxiliary_requests (Any): Any auxiliary requests generated during reading. - inferred_schema (Any): The schema inferred from the stream data. - latest_config_update (Any): The latest configuration update, if applicable. - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.