airbyte_cdk.connector_builder.test_reader
42class TestReader: 43 """ 44 A utility class for performing test reads from a declarative data source, primarily used to validate 45 connector configurations by performing partial stream reads. 46 47 Initialization: 48 49 TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000) 50 Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records 51 per read operation. 52 53 Public Methods: 54 run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead: 55 56 Executes a test read operation from the given declarative source. It configures and infers the schema, 57 processes the read messages (including logging and error handling), and returns a StreamRead object 58 that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats. 59 60 Parameters: 61 source (ConcurrentDeclarativeSource): The data source to read from. 62 config (Mapping[str, Any]): Configuration parameters for the source. 63 configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration. 64 state (List[AirbyteStateMessage]): Current state information for the read. 65 record_limit (Optional[int]): Optional override for the maximum number of records to read. 66 67 Returns: 68 StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata, 69 along with indicators if any configured limit was reached. 70 71 """ 72 73 __test__: ClassVar[bool] = False # Tell Pytest this is not a Pytest class, despite its name 74 75 logger = logging.getLogger("airbyte.connector-builder") 76 77 def __init__( 78 self, 79 max_pages_per_slice: int, 80 max_slices: int, 81 max_record_limit: int = 1000, 82 ) -> None: 83 self._max_pages_per_slice = max_pages_per_slice 84 self._max_slices = max_slices 85 self._max_record_limit = max_record_limit 86 87 def run_test_read( 88 self, 89 source: ConcurrentDeclarativeSource, 90 config: Mapping[str, Any], 91 configured_catalog: ConfiguredAirbyteCatalog, 92 stream_name: str, 93 state: List[AirbyteStateMessage], 94 record_limit: Optional[int] = None, 95 ) -> StreamRead: 96 """ 97 Run a test read for the connector by reading from a single stream and inferring schema and datetime formats. 98 99 Parameters: 100 source (ConcurrentDeclarativeSource): The source instance providing the streams. 101 config (Mapping[str, Any]): The configuration settings to use for reading. 102 configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration. 103 state (List[AirbyteStateMessage]): A list of state messages to resume the read. 104 record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None. 105 106 Returns: 107 StreamRead: An object containing the following attributes: 108 - logs (List[str]): Log messages generated during the process. 109 - slices (List[Any]): The data slices read from the stream. 110 - test_read_limit_reached (bool): Indicates whether the record limit was reached. 111 - auxiliary_requests (Any): Any auxiliary requests generated during reading. 112 - inferred_schema (Any): The schema inferred from the stream data. 113 - latest_config_update (Any): The latest configuration update, if applicable. 114 - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats. 115 """ 116 117 record_limit = self._check_record_limit(record_limit) 118 # The connector builder currently only supports reading from a single stream at a time 119 streams = source.streams(config) 120 stream = next((stream for stream in streams if stream.name == stream_name), None) 121 122 # get any deprecation warnings during the component creation 123 deprecation_warnings: List[LogMessage] = source.deprecation_warnings() 124 125 schema_inferrer = SchemaInferrer( 126 self._pk_to_nested_and_composite_field( 127 stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key # type: ignore # We are accessing the private property here as the primary key is not exposed. We should either expose it or use `as_airbyte_stream` to retrieve it as this is the "official" way where it is exposed in the Airbyte protocol 128 ) 129 if stream 130 else None, 131 self._cursor_field_to_nested_and_composite_field(stream.cursor_field) 132 if stream and stream.cursor_field 133 else None, 134 ) 135 datetime_format_inferrer = DatetimeFormatInferrer() 136 137 message_group = get_message_groups( 138 self._read_stream(source, config, configured_catalog, state), 139 schema_inferrer, 140 datetime_format_inferrer, 141 record_limit, 142 stream_name, 143 ) 144 145 slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups( 146 message_group 147 ) 148 149 # extend log messages with deprecation warnings 150 log_messages.extend(deprecation_warnings) 151 152 schema, log_messages = self._get_infered_schema( 153 configured_catalog, schema_inferrer, log_messages 154 ) 155 156 return StreamRead( 157 logs=log_messages, 158 slices=slices, 159 test_read_limit_reached=self._has_reached_limit(slices), 160 auxiliary_requests=auxiliary_requests, 161 inferred_schema=schema, 162 latest_config_update=self._get_latest_config_update(latest_config_update), 163 inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(), 164 ) 165 166 def _pk_to_nested_and_composite_field( 167 self, field: Optional[Union[str, List[str], List[List[str]]]] 168 ) -> List[List[str]]: 169 """ 170 Converts a primary key definition into a nested list representation. 171 172 The function accepts a primary key that can be a single string, a list of strings, or a list of lists of strings. 173 It ensures that the return value is always a list of lists of strings. 174 175 Parameters: 176 field (Optional[Union[str, List[str], List[List[str]]]]): 177 The primary key definition. This can be: 178 - None or an empty value: returns a list containing an empty list. 179 - A single string: returns a list containing one list with the string. 180 - A list of strings (composite key): returns a list where each key is encapsulated in its own list. 181 - A list of lists of strings (nested field structure): returns as is. 182 183 Returns: 184 List[List[str]]: 185 A nested list representation of the primary key. 186 """ 187 if not field: 188 return [[]] 189 190 if isinstance(field, str): 191 return [[field]] 192 193 is_composite_key = isinstance(field[0], str) 194 if is_composite_key: 195 return [[i] for i in field] # type: ignore # the type of field is expected to be List[str] here 196 197 return field # type: ignore # the type of field is expected to be List[List[str]] here 198 199 def _cursor_field_to_nested_and_composite_field( 200 self, field: Union[str, List[str]] 201 ) -> List[List[str]]: 202 """ 203 Transforms the cursor field input into a nested list format suitable for further processing. 204 205 This function accepts a cursor field specification, which can be either: 206 - A falsy value (e.g., None or an empty string), in which case it returns a list containing an empty list. 207 - A string representing a simple cursor field. The string is wrapped in a nested list. 208 - A list of strings representing a composite or nested cursor field. The list is returned wrapped in an outer list. 209 210 Parameters: 211 field (Union[str, List[str]]): The cursor field specification. It can be: 212 - An empty or falsy value: returns [[]]. 213 - A string: returns [[field]]. 214 - A list of strings: returns [field] if the first element is a string. 215 216 Returns: 217 List[List[str]]: A nested list representation of the cursor field. 218 219 Raises: 220 ValueError: If the input is a list but its first element is not a string, 221 indicating an unsupported type for a cursor field. 222 """ 223 if not field: 224 return [[]] 225 226 if isinstance(field, str): 227 return [[field]] 228 229 is_nested_key = isinstance(field[0], str) 230 if is_nested_key: 231 return [field] 232 233 raise ValueError(f"Unknown type for cursor field `{field}") 234 235 def _check_record_limit(self, record_limit: Optional[int] = None) -> int: 236 """ 237 Checks and adjusts the provided record limit to ensure it falls within the valid range. 238 239 If record_limit is provided, it must be between 1 and self._max_record_limit inclusive. 240 If record_limit is None, it defaults to self._max_record_limit. 241 242 Args: 243 record_limit (Optional[int]): The requested record limit to validate. 244 245 Returns: 246 int: The validated record limit. If record_limit exceeds self._max_record_limit, the maximum allowed value is used. 247 248 Raises: 249 ValueError: If record_limit is provided and is not between 1 and self._max_record_limit. 250 """ 251 if record_limit is not None and not (1 <= record_limit <= self._max_record_limit): 252 raise ValueError( 253 f"Record limit must be between 1 and {self._max_record_limit}. Got {record_limit}" 254 ) 255 256 if record_limit is None: 257 record_limit = self._max_record_limit 258 else: 259 record_limit = min(record_limit, self._max_record_limit) 260 261 return record_limit 262 263 def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES: 264 """ 265 Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update. 266 267 This function iterates over each message group in the provided collection and processes it based on its type: 268 - AirbyteLogMessage: Converts the log message into a LogMessage object and appends it to the log_messages list. 269 - AirbyteTraceMessage with type ERROR: Extracts error details, creates a LogMessage at the "ERROR" level, and appends it. 270 - AirbyteControlMessage: Updates the latest_config_update if the current message is more recent. 271 - AuxiliaryRequest: Appends the message to the auxiliary_requests list. 272 - StreamReadSlices: Appends the message to the slices list. 273 - Any other type: Raises a ValueError indicating an unknown message group type. 274 275 Parameters: 276 message_groups (MESSAGE_GROUPS): A collection of message groups to be processed. 277 278 Returns: 279 GROUPED_MESSAGES: A tuple containing four elements: 280 - slices: A list of StreamReadSlices objects. 281 - log_messages: A list of LogMessage objects. 282 - auxiliary_requests: A list of AuxiliaryRequest objects. 283 - latest_config_update: The most recent AirbyteControlMessage, or None if none was processed. 284 285 Raises: 286 ValueError: If a message group of an unknown type is encountered. 287 """ 288 289 slices = [] 290 log_messages = [] 291 auxiliary_requests = [] 292 latest_config_update: Optional[AirbyteControlMessage] = None 293 294 # process the message groups first 295 for message_group in message_groups: 296 match message_group: 297 case AirbyteLogMessage(): 298 log_messages.append( 299 LogMessage(message=message_group.message, level=message_group.level.value) 300 ) 301 case AirbyteTraceMessage(): 302 if message_group.type == TraceType.ERROR: 303 log_messages.append( 304 LogMessage( 305 message=message_group.error.message, # type: ignore 306 level="ERROR", 307 internal_message=message_group.error.internal_message, # type: ignore 308 stacktrace=message_group.error.stack_trace, # type: ignore 309 ) 310 ) 311 case AirbyteControlMessage(): 312 if ( 313 not latest_config_update 314 or latest_config_update.emitted_at <= message_group.emitted_at 315 ): 316 latest_config_update = message_group 317 case AuxiliaryRequest(): 318 auxiliary_requests.append(message_group) 319 case StreamReadSlices(): 320 slices.append(message_group) 321 case _: 322 raise ValueError(f"Unknown message group type: {type(message_group)}") 323 324 return slices, log_messages, auxiliary_requests, latest_config_update 325 326 def _get_infered_schema( 327 self, 328 configured_catalog: ConfiguredAirbyteCatalog, 329 schema_inferrer: SchemaInferrer, 330 log_messages: List[LogMessage], 331 ) -> INFERRED_SCHEMA_OUTPUT_TYPE: 332 """ 333 Retrieves the inferred schema from the given configured catalog using the provided schema inferrer. 334 335 This function processes a single stream from the configured catalog. It attempts to obtain the stream's 336 schema via the schema inferrer. If a SchemaValidationException occurs, each validation error is logged in the 337 provided log_messages list and the partially inferred schema (from the exception) is returned. 338 339 Parameters: 340 configured_catalog (ConfiguredAirbyteCatalog): The configured catalog that contains the stream metadata. 341 It is assumed that only one stream is defined. 342 schema_inferrer (SchemaInferrer): An instance responsible for inferring the schema for a given stream. 343 log_messages (List[LogMessage]): A list that will be appended with log messages, especially error messages 344 if schema validation issues arise. 345 346 Returns: 347 INFERRED_SCHEMA_OUTPUT_TYPE: A tuple consisting of the inferred schema and the updated list of log messages. 348 """ 349 350 try: 351 # The connector builder currently only supports reading from a single stream at a time 352 configured_stream = configured_catalog.streams[0] 353 schema = schema_inferrer.get_stream_schema(configured_stream.stream.name) 354 except SchemaValidationException as exception: 355 # we update the log_messages with possible errors 356 for validation_error in exception.validation_errors: 357 log_messages.append(LogMessage(validation_error, "ERROR")) 358 schema = exception.schema 359 360 return schema, log_messages 361 362 def _get_latest_config_update( 363 self, 364 latest_config_update: AirbyteControlMessage | None, 365 ) -> Dict[str, Any] | None: 366 """ 367 Retrieves a cleaned configuration from the latest Airbyte control message. 368 369 This helper function extracts the configuration from the given Airbyte control message, cleans it using the internal `Parsers.clean_config` function, 370 and returns the resulting dictionary. If no control message is provided (i.e., latest_config_update is None), the function returns None. 371 372 Parameters: 373 latest_config_update (AirbyteControlMessage | None): The control message containing the connector configuration. May be None. 374 375 Returns: 376 Dict[str, Any] | None: The cleaned configuration dictionary if available; otherwise, None. 377 """ 378 379 return ( 380 clean_config(latest_config_update.connectorConfig.config) # type: ignore 381 if latest_config_update 382 else None 383 ) 384 385 def _read_stream( 386 self, 387 source: ConcurrentDeclarativeSource, 388 config: Mapping[str, Any], 389 configured_catalog: ConfiguredAirbyteCatalog, 390 state: List[AirbyteStateMessage], 391 ) -> Iterator[AirbyteMessage]: 392 """ 393 Reads messages from the given ConcurrentDeclarativeSource using an AirbyteEntrypoint. 394 395 This method attempts to yield messages from the source's read generator. If the generator 396 raises an AirbyteTracedException, it checks whether the exception message indicates a non-actionable 397 error (e.g., a final exception from AbstractSource that should not be logged). In that case, it stops 398 processing without yielding the exception as a message. For other exceptions, the exception is caught, 399 wrapped into an AirbyteTracedException, and yielded as an AirbyteMessage. 400 401 Parameters: 402 source (ConcurrentDeclarativeSource): The source object that provides data reading logic. 403 config (Mapping[str, Any]): The configuration dictionary for the source. 404 configured_catalog (ConfiguredAirbyteCatalog): The catalog defining the streams and their configurations. 405 state (List[AirbyteStateMessage]): A list representing the current state for incremental sync. 406 407 Yields: 408 AirbyteMessage: Messages yielded from the source's generator. In case of exceptions, 409 an AirbyteMessage encapsulating the error is yielded instead. 410 """ 411 # the generator can raise an exception 412 # iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage 413 try: 414 yield from AirbyteEntrypoint(source).read( 415 source.spec(self.logger), config, configured_catalog, state 416 ) 417 except AirbyteTracedException as traced_exception: 418 # Look for this message which indicates that it is the "final exception" raised by AbstractSource. 419 # If it matches, don't yield this as we don't need to show this in the Builder. 420 # This is somewhat brittle as it relies on the message string, but if they drift then the worst case 421 # is that this message will be shown in the Builder. 422 if ( 423 traced_exception.message is not None 424 and "During the sync, the following streams did not sync successfully" 425 in traced_exception.message 426 ): 427 return 428 yield traced_exception.as_airbyte_message() 429 except Exception as e: 430 error_message = f"{e.args[0] if len(e.args) > 0 else str(e)}" 431 yield AirbyteTracedException.from_exception( 432 e, message=error_message 433 ).as_airbyte_message() 434 435 def _has_reached_limit(self, slices: List[StreamReadSlices]) -> bool: 436 """ 437 Determines whether the provided collection of slices has reached any defined limits. 438 439 This function checks for three types of limits: 440 1. If the number of slices is greater than or equal to a maximum slice limit. 441 2. If any individual slice has a number of pages that meets or exceeds a maximum number of pages per slice. 442 3. If the cumulative number of records across all pages in all slices reaches or exceeds a maximum record limit. 443 444 Parameters: 445 slices (List[StreamReadSlices]): A list where each element represents a slice containing one or more pages, and each page has a collection of records. 446 447 Returns: 448 bool: True if any of the following conditions is met: 449 - The number of slices is at or above the maximum allowed slices. 450 - Any slice contains pages at or above the maximum allowed per slice. 451 - The total count of records reaches or exceeds the maximum record limit. 452 False otherwise. 453 """ 454 if len(slices) >= self._max_slices: 455 return True 456 457 record_count = 0 458 459 for _slice in slices: 460 if len(_slice.pages) >= self._max_pages_per_slice: 461 return True 462 for page in _slice.pages: 463 record_count += len(page.records) 464 if record_count >= self._max_record_limit: 465 return True 466 return False
A utility class for performing test reads from a declarative data source, primarily used to validate connector configurations by performing partial stream reads.
Initialization:
TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000) Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records per read operation.
Public Methods:
run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead:
Executes a test read operation from the given declarative source. It configures and infers the schema, processes the read messages (including logging and error handling), and returns a StreamRead object that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats. Parameters: source (ConcurrentDeclarativeSource): The data source to read from. config (Mapping[str, Any]): Configuration parameters for the source. configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration. state (List[AirbyteStateMessage]): Current state information for the read. record_limit (Optional[int]): Optional override for the maximum number of records to read. Returns: StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata, along with indicators if any configured limit was reached.
87 def run_test_read( 88 self, 89 source: ConcurrentDeclarativeSource, 90 config: Mapping[str, Any], 91 configured_catalog: ConfiguredAirbyteCatalog, 92 stream_name: str, 93 state: List[AirbyteStateMessage], 94 record_limit: Optional[int] = None, 95 ) -> StreamRead: 96 """ 97 Run a test read for the connector by reading from a single stream and inferring schema and datetime formats. 98 99 Parameters: 100 source (ConcurrentDeclarativeSource): The source instance providing the streams. 101 config (Mapping[str, Any]): The configuration settings to use for reading. 102 configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration. 103 state (List[AirbyteStateMessage]): A list of state messages to resume the read. 104 record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None. 105 106 Returns: 107 StreamRead: An object containing the following attributes: 108 - logs (List[str]): Log messages generated during the process. 109 - slices (List[Any]): The data slices read from the stream. 110 - test_read_limit_reached (bool): Indicates whether the record limit was reached. 111 - auxiliary_requests (Any): Any auxiliary requests generated during reading. 112 - inferred_schema (Any): The schema inferred from the stream data. 113 - latest_config_update (Any): The latest configuration update, if applicable. 114 - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats. 115 """ 116 117 record_limit = self._check_record_limit(record_limit) 118 # The connector builder currently only supports reading from a single stream at a time 119 streams = source.streams(config) 120 stream = next((stream for stream in streams if stream.name == stream_name), None) 121 122 # get any deprecation warnings during the component creation 123 deprecation_warnings: List[LogMessage] = source.deprecation_warnings() 124 125 schema_inferrer = SchemaInferrer( 126 self._pk_to_nested_and_composite_field( 127 stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key # type: ignore # We are accessing the private property here as the primary key is not exposed. We should either expose it or use `as_airbyte_stream` to retrieve it as this is the "official" way where it is exposed in the Airbyte protocol 128 ) 129 if stream 130 else None, 131 self._cursor_field_to_nested_and_composite_field(stream.cursor_field) 132 if stream and stream.cursor_field 133 else None, 134 ) 135 datetime_format_inferrer = DatetimeFormatInferrer() 136 137 message_group = get_message_groups( 138 self._read_stream(source, config, configured_catalog, state), 139 schema_inferrer, 140 datetime_format_inferrer, 141 record_limit, 142 stream_name, 143 ) 144 145 slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups( 146 message_group 147 ) 148 149 # extend log messages with deprecation warnings 150 log_messages.extend(deprecation_warnings) 151 152 schema, log_messages = self._get_infered_schema( 153 configured_catalog, schema_inferrer, log_messages 154 ) 155 156 return StreamRead( 157 logs=log_messages, 158 slices=slices, 159 test_read_limit_reached=self._has_reached_limit(slices), 160 auxiliary_requests=auxiliary_requests, 161 inferred_schema=schema, 162 latest_config_update=self._get_latest_config_update(latest_config_update), 163 inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(), 164 )
Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.
Arguments:
- source (ConcurrentDeclarativeSource): The source instance providing the streams.
- config (Mapping[str, Any]): The configuration settings to use for reading.
- configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
- state (List[AirbyteStateMessage]): A list of state messages to resume the read.
- record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
Returns:
StreamRead: An object containing the following attributes: - logs (List[str]): Log messages generated during the process. - slices (List[Any]): The data slices read from the stream. - test_read_limit_reached (bool): Indicates whether the record limit was reached. - auxiliary_requests (Any): Any auxiliary requests generated during reading. - inferred_schema (Any): The schema inferred from the stream data. - latest_config_update (Any): The latest configuration update, if applicable. - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.