airbyte_cdk.connector_builder.test_reader
40class TestReader: 41 """ 42 A utility class for performing test reads from a declarative data source, primarily used to validate 43 connector configurations by performing partial stream reads. 44 45 Initialization: 46 47 TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000) 48 Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records 49 per read operation. 50 51 Public Methods: 52 run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead: 53 54 Executes a test read operation from the given declarative source. It configures and infers the schema, 55 processes the read messages (including logging and error handling), and returns a StreamRead object 56 that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats. 57 58 Parameters: 59 source (DeclarativeSource): The data source to read from. 60 config (Mapping[str, Any]): Configuration parameters for the source. 61 configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration. 62 state (List[AirbyteStateMessage]): Current state information for the read. 63 record_limit (Optional[int]): Optional override for the maximum number of records to read. 64 65 Returns: 66 StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata, 67 along with indicators if any configured limit was reached. 68 69 """ 70 71 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 72 73 logger = logging.getLogger("airbyte.connector-builder") 74 75 def __init__( 76 self, 77 max_pages_per_slice: int, 78 max_slices: int, 79 max_record_limit: int = 1000, 80 ) -> None: 81 self._max_pages_per_slice = max_pages_per_slice 82 self._max_slices = max_slices 83 self._max_record_limit = max_record_limit 84 85 def run_test_read( 86 self, 87 source: DeclarativeSource, 88 config: Mapping[str, Any], 89 configured_catalog: ConfiguredAirbyteCatalog, 90 stream_name: str, 91 state: List[AirbyteStateMessage], 92 record_limit: Optional[int] = None, 93 ) -> StreamRead: 94 """ 95 Run a test read for the connector by reading from a single stream and inferring schema and datetime formats. 96 97 Parameters: 98 source (DeclarativeSource): The source instance providing the streams. 99 config (Mapping[str, Any]): The configuration settings to use for reading. 100 configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration. 101 state (List[AirbyteStateMessage]): A list of state messages to resume the read. 102 record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None. 103 104 Returns: 105 StreamRead: An object containing the following attributes: 106 - logs (List[str]): Log messages generated during the process. 107 - slices (List[Any]): The data slices read from the stream. 108 - test_read_limit_reached (bool): Indicates whether the record limit was reached. 109 - auxiliary_requests (Any): Any auxiliary requests generated during reading. 110 - inferred_schema (Any): The schema inferred from the stream data. 111 - latest_config_update (Any): The latest configuration update, if applicable. 112 - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats. 113 """ 114 115 record_limit = self._check_record_limit(record_limit) 116 # The connector builder currently only supports reading from a single stream at a time 117 streams = source.streams(config) 118 stream = next((stream for stream in streams if stream.name == stream_name), None) 119 120 # get any deprecation warnings during the component creation 121 deprecation_warnings: List[LogMessage] = source.deprecation_warnings() 122 123 schema_inferrer = SchemaInferrer( 124 self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None, 125 self._cursor_field_to_nested_and_composite_field(stream.cursor_field) 126 if stream 127 else None, 128 ) 129 datetime_format_inferrer = DatetimeFormatInferrer() 130 131 message_group = get_message_groups( 132 self._read_stream(source, config, configured_catalog, state), 133 schema_inferrer, 134 datetime_format_inferrer, 135 record_limit, 136 stream_name, 137 ) 138 139 slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups( 140 message_group 141 ) 142 143 # extend log messages with deprecation warnings 144 log_messages.extend(deprecation_warnings) 145 146 schema, log_messages = self._get_infered_schema( 147 configured_catalog, schema_inferrer, log_messages 148 ) 149 150 return StreamRead( 151 logs=log_messages, 152 slices=slices, 153 test_read_limit_reached=self._has_reached_limit(slices), 154 auxiliary_requests=auxiliary_requests, 155 inferred_schema=schema, 156 latest_config_update=self._get_latest_config_update(latest_config_update), 157 inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(), 158 ) 159 160 def _pk_to_nested_and_composite_field( 161 self, field: Optional[Union[str, List[str], List[List[str]]]] 162 ) -> List[List[str]]: 163 """ 164 Converts a primary key definition into a nested list representation. 165 166 The function accepts a primary key that can be a single string, a list of strings, or a list of lists of strings. 167 It ensures that the return value is always a list of lists of strings. 168 169 Parameters: 170 field (Optional[Union[str, List[str], List[List[str]]]]): 171 The primary key definition. This can be: 172 - None or an empty value: returns a list containing an empty list. 173 - A single string: returns a list containing one list with the string. 174 - A list of strings (composite key): returns a list where each key is encapsulated in its own list. 175 - A list of lists of strings (nested field structure): returns as is. 176 177 Returns: 178 List[List[str]]: 179 A nested list representation of the primary key. 180 """ 181 if not field: 182 return [[]] 183 184 if isinstance(field, str): 185 return [[field]] 186 187 is_composite_key = isinstance(field[0], str) 188 if is_composite_key: 189 return [[i] for i in field] # type: ignore # the type of field is expected to be List[str] here 190 191 return field # type: ignore # the type of field is expected to be List[List[str]] here 192 193 def _cursor_field_to_nested_and_composite_field( 194 self, field: Union[str, List[str]] 195 ) -> List[List[str]]: 196 """ 197 Transforms the cursor field input into a nested list format suitable for further processing. 198 199 This function accepts a cursor field specification, which can be either: 200 - A falsy value (e.g., None or an empty string), in which case it returns a list containing an empty list. 201 - A string representing a simple cursor field. The string is wrapped in a nested list. 202 - A list of strings representing a composite or nested cursor field. The list is returned wrapped in an outer list. 203 204 Parameters: 205 field (Union[str, List[str]]): The cursor field specification. It can be: 206 - An empty or falsy value: returns [[]]. 207 - A string: returns [[field]]. 208 - A list of strings: returns [field] if the first element is a string. 209 210 Returns: 211 List[List[str]]: A nested list representation of the cursor field. 212 213 Raises: 214 ValueError: If the input is a list but its first element is not a string, 215 indicating an unsupported type for a cursor field. 216 """ 217 if not field: 218 return [[]] 219 220 if isinstance(field, str): 221 return [[field]] 222 223 is_nested_key = isinstance(field[0], str) 224 if is_nested_key: 225 return [field] 226 227 raise ValueError(f"Unknown type for cursor field `{field}") 228 229 def _check_record_limit(self, record_limit: Optional[int] = None) -> int: 230 """ 231 Checks and adjusts the provided record limit to ensure it falls within the valid range. 232 233 If record_limit is provided, it must be between 1 and self._max_record_limit inclusive. 234 If record_limit is None, it defaults to self._max_record_limit. 235 236 Args: 237 record_limit (Optional[int]): The requested record limit to validate. 238 239 Returns: 240 int: The validated record limit. If record_limit exceeds self._max_record_limit, the maximum allowed value is used. 241 242 Raises: 243 ValueError: If record_limit is provided and is not between 1 and self._max_record_limit. 244 """ 245 if record_limit is not None and not (1 <= record_limit <= self._max_record_limit): 246 raise ValueError( 247 f"Record limit must be between 1 and {self._max_record_limit}. Got {record_limit}" 248 ) 249 250 if record_limit is None: 251 record_limit = self._max_record_limit 252 else: 253 record_limit = min(record_limit, self._max_record_limit) 254 255 return record_limit 256 257 def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES: 258 """ 259 Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update. 260 261 This function iterates over each message group in the provided collection and processes it based on its type: 262 - AirbyteLogMessage: Converts the log message into a LogMessage object and appends it to the log_messages list. 263 - AirbyteTraceMessage with type ERROR: Extracts error details, creates a LogMessage at the "ERROR" level, and appends it. 264 - AirbyteControlMessage: Updates the latest_config_update if the current message is more recent. 265 - AuxiliaryRequest: Appends the message to the auxiliary_requests list. 266 - StreamReadSlices: Appends the message to the slices list. 267 - Any other type: Raises a ValueError indicating an unknown message group type. 268 269 Parameters: 270 message_groups (MESSAGE_GROUPS): A collection of message groups to be processed. 271 272 Returns: 273 GROUPED_MESSAGES: A tuple containing four elements: 274 - slices: A list of StreamReadSlices objects. 275 - log_messages: A list of LogMessage objects. 276 - auxiliary_requests: A list of AuxiliaryRequest objects. 277 - latest_config_update: The most recent AirbyteControlMessage, or None if none was processed. 278 279 Raises: 280 ValueError: If a message group of an unknown type is encountered. 281 """ 282 283 slices = [] 284 log_messages = [] 285 auxiliary_requests = [] 286 latest_config_update: Optional[AirbyteControlMessage] = None 287 288 # process the message groups first 289 for message_group in message_groups: 290 match message_group: 291 case AirbyteLogMessage(): 292 log_messages.append( 293 LogMessage(message=message_group.message, level=message_group.level.value) 294 ) 295 case AirbyteTraceMessage(): 296 if message_group.type == TraceType.ERROR: 297 log_messages.append( 298 LogMessage( 299 message=message_group.error.message, # type: ignore 300 level="ERROR", 301 internal_message=message_group.error.internal_message, # type: ignore 302 stacktrace=message_group.error.stack_trace, # type: ignore 303 ) 304 ) 305 case AirbyteControlMessage(): 306 if ( 307 not latest_config_update 308 or latest_config_update.emitted_at <= message_group.emitted_at 309 ): 310 latest_config_update = message_group 311 case AuxiliaryRequest(): 312 auxiliary_requests.append(message_group) 313 case StreamReadSlices(): 314 slices.append(message_group) 315 case _: 316 raise ValueError(f"Unknown message group type: {type(message_group)}") 317 318 return slices, log_messages, auxiliary_requests, latest_config_update 319 320 def _get_infered_schema( 321 self, 322 configured_catalog: ConfiguredAirbyteCatalog, 323 schema_inferrer: SchemaInferrer, 324 log_messages: List[LogMessage], 325 ) -> INFERRED_SCHEMA_OUTPUT_TYPE: 326 """ 327 Retrieves the inferred schema from the given configured catalog using the provided schema inferrer. 328 329 This function processes a single stream from the configured catalog. It attempts to obtain the stream's 330 schema via the schema inferrer. If a SchemaValidationException occurs, each validation error is logged in the 331 provided log_messages list and the partially inferred schema (from the exception) is returned. 332 333 Parameters: 334 configured_catalog (ConfiguredAirbyteCatalog): The configured catalog that contains the stream metadata. 335 It is assumed that only one stream is defined. 336 schema_inferrer (SchemaInferrer): An instance responsible for inferring the schema for a given stream. 337 log_messages (List[LogMessage]): A list that will be appended with log messages, especially error messages 338 if schema validation issues arise. 339 340 Returns: 341 INFERRED_SCHEMA_OUTPUT_TYPE: A tuple consisting of the inferred schema and the updated list of log messages. 342 """ 343 344 try: 345 # The connector builder currently only supports reading from a single stream at a time 346 configured_stream = configured_catalog.streams[0] 347 schema = schema_inferrer.get_stream_schema(configured_stream.stream.name) 348 except SchemaValidationException as exception: 349 # we update the log_messages with possible errors 350 for validation_error in exception.validation_errors: 351 log_messages.append(LogMessage(validation_error, "ERROR")) 352 schema = exception.schema 353 354 return schema, log_messages 355 356 def _get_latest_config_update( 357 self, 358 latest_config_update: AirbyteControlMessage | None, 359 ) -> Dict[str, Any] | None: 360 """ 361 Retrieves a cleaned configuration from the latest Airbyte control message. 362 363 This helper function extracts the configuration from the given Airbyte control message, cleans it using the internal `Parsers.clean_config` function, 364 and returns the resulting dictionary. If no control message is provided (i.e., latest_config_update is None), the function returns None. 365 366 Parameters: 367 latest_config_update (AirbyteControlMessage | None): The control message containing the connector configuration. May be None. 368 369 Returns: 370 Dict[str, Any] | None: The cleaned configuration dictionary if available; otherwise, None. 371 """ 372 373 return ( 374 clean_config(latest_config_update.connectorConfig.config) # type: ignore 375 if latest_config_update 376 else None 377 ) 378 379 def _read_stream( 380 self, 381 source: DeclarativeSource, 382 config: Mapping[str, Any], 383 configured_catalog: ConfiguredAirbyteCatalog, 384 state: List[AirbyteStateMessage], 385 ) -> Iterator[AirbyteMessage]: 386 """ 387 Reads messages from the given DeclarativeSource using an AirbyteEntrypoint. 388 389 This method attempts to yield messages from the source's read generator. If the generator 390 raises an AirbyteTracedException, it checks whether the exception message indicates a non-actionable 391 error (e.g., a final exception from AbstractSource that should not be logged). In that case, it stops 392 processing without yielding the exception as a message. For other exceptions, the exception is caught, 393 wrapped into an AirbyteTracedException, and yielded as an AirbyteMessage. 394 395 Parameters: 396 source (DeclarativeSource): The source object that provides data reading logic. 397 config (Mapping[str, Any]): The configuration dictionary for the source. 398 configured_catalog (ConfiguredAirbyteCatalog): The catalog defining the streams and their configurations. 399 state (List[AirbyteStateMessage]): A list representing the current state for incremental sync. 400 401 Yields: 402 AirbyteMessage: Messages yielded from the source's generator. In case of exceptions, 403 an AirbyteMessage encapsulating the error is yielded instead. 404 """ 405 # the generator can raise an exception 406 # iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage 407 try: 408 yield from AirbyteEntrypoint(source).read( 409 source.spec(self.logger), config, configured_catalog, state 410 ) 411 except AirbyteTracedException as traced_exception: 412 # Look for this message which indicates that it is the "final exception" raised by AbstractSource. 413 # If it matches, don't yield this as we don't need to show this in the Builder. 414 # This is somewhat brittle as it relies on the message string, but if they drift then the worst case 415 # is that this message will be shown in the Builder. 416 if ( 417 traced_exception.message is not None 418 and "During the sync, the following streams did not sync successfully" 419 in traced_exception.message 420 ): 421 return 422 yield traced_exception.as_airbyte_message() 423 except Exception as e: 424 error_message = f"{e.args[0] if len(e.args) > 0 else str(e)}" 425 yield AirbyteTracedException.from_exception( 426 e, message=error_message 427 ).as_airbyte_message() 428 429 def _has_reached_limit(self, slices: List[StreamReadSlices]) -> bool: 430 """ 431 Determines whether the provided collection of slices has reached any defined limits. 432 433 This function checks for three types of limits: 434 1. If the number of slices is greater than or equal to a maximum slice limit. 435 2. If any individual slice has a number of pages that meets or exceeds a maximum number of pages per slice. 436 3. If the cumulative number of records across all pages in all slices reaches or exceeds a maximum record limit. 437 438 Parameters: 439 slices (List[StreamReadSlices]): A list where each element represents a slice containing one or more pages, and each page has a collection of records. 440 441 Returns: 442 bool: True if any of the following conditions is met: 443 - The number of slices is at or above the maximum allowed slices. 444 - Any slice contains pages at or above the maximum allowed per slice. 445 - The total count of records reaches or exceeds the maximum record limit. 446 False otherwise. 447 """ 448 if len(slices) >= self._max_slices: 449 return True 450 451 record_count = 0 452 453 for _slice in slices: 454 if len(_slice.pages) >= self._max_pages_per_slice: 455 return True 456 for page in _slice.pages: 457 record_count += len(page.records) 458 if record_count >= self._max_record_limit: 459 return True 460 return False
A utility class for performing test reads from a declarative data source, primarily used to validate connector configurations by performing partial stream reads.
Initialization:
TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000) Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records per read operation.
Public Methods:
run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead:
Executes a test read operation from the given declarative source. It configures and infers the schema, processes the read messages (including logging and error handling), and returns a StreamRead object that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats. Parameters: source (DeclarativeSource): The data source to read from. config (Mapping[str, Any]): Configuration parameters for the source. configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration. state (List[AirbyteStateMessage]): Current state information for the read. record_limit (Optional[int]): Optional override for the maximum number of records to read. Returns: StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata, along with indicators if any configured limit was reached.
85 def run_test_read( 86 self, 87 source: DeclarativeSource, 88 config: Mapping[str, Any], 89 configured_catalog: ConfiguredAirbyteCatalog, 90 stream_name: str, 91 state: List[AirbyteStateMessage], 92 record_limit: Optional[int] = None, 93 ) -> StreamRead: 94 """ 95 Run a test read for the connector by reading from a single stream and inferring schema and datetime formats. 96 97 Parameters: 98 source (DeclarativeSource): The source instance providing the streams. 99 config (Mapping[str, Any]): The configuration settings to use for reading. 100 configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration. 101 state (List[AirbyteStateMessage]): A list of state messages to resume the read. 102 record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None. 103 104 Returns: 105 StreamRead: An object containing the following attributes: 106 - logs (List[str]): Log messages generated during the process. 107 - slices (List[Any]): The data slices read from the stream. 108 - test_read_limit_reached (bool): Indicates whether the record limit was reached. 109 - auxiliary_requests (Any): Any auxiliary requests generated during reading. 110 - inferred_schema (Any): The schema inferred from the stream data. 111 - latest_config_update (Any): The latest configuration update, if applicable. 112 - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats. 113 """ 114 115 record_limit = self._check_record_limit(record_limit) 116 # The connector builder currently only supports reading from a single stream at a time 117 streams = source.streams(config) 118 stream = next((stream for stream in streams if stream.name == stream_name), None) 119 120 # get any deprecation warnings during the component creation 121 deprecation_warnings: List[LogMessage] = source.deprecation_warnings() 122 123 schema_inferrer = SchemaInferrer( 124 self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None, 125 self._cursor_field_to_nested_and_composite_field(stream.cursor_field) 126 if stream 127 else None, 128 ) 129 datetime_format_inferrer = DatetimeFormatInferrer() 130 131 message_group = get_message_groups( 132 self._read_stream(source, config, configured_catalog, state), 133 schema_inferrer, 134 datetime_format_inferrer, 135 record_limit, 136 stream_name, 137 ) 138 139 slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups( 140 message_group 141 ) 142 143 # extend log messages with deprecation warnings 144 log_messages.extend(deprecation_warnings) 145 146 schema, log_messages = self._get_infered_schema( 147 configured_catalog, schema_inferrer, log_messages 148 ) 149 150 return StreamRead( 151 logs=log_messages, 152 slices=slices, 153 test_read_limit_reached=self._has_reached_limit(slices), 154 auxiliary_requests=auxiliary_requests, 155 inferred_schema=schema, 156 latest_config_update=self._get_latest_config_update(latest_config_update), 157 inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(), 158 )
Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.
Arguments:
- source (DeclarativeSource): The source instance providing the streams.
- config (Mapping[str, Any]): The configuration settings to use for reading.
- configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
- state (List[AirbyteStateMessage]): A list of state messages to resume the read.
- record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
Returns:
StreamRead: An object containing the following attributes: - logs (List[str]): Log messages generated during the process. - slices (List[Any]): The data slices read from the stream. - test_read_limit_reached (bool): Indicates whether the record limit was reached. - auxiliary_requests (Any): Any auxiliary requests generated during reading. - inferred_schema (Any): The schema inferred from the stream data. - latest_config_update (Any): The latest configuration update, if applicable. - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.