airbyte_cdk.connector_builder.test_reader

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from .reader import TestReader
6
7__all__ = ("TestReader",)
class TestReader:
 40class TestReader:
 41    """
 42    A utility class for performing test reads from a declarative data source, primarily used to validate
 43    connector configurations by performing partial stream reads.
 44
 45    Initialization:
 46
 47        TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000)
 48            Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records
 49            per read operation.
 50
 51    Public Methods:
 52        run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead:
 53
 54            Executes a test read operation from the given declarative source. It configures and infers the schema,
 55            processes the read messages (including logging and error handling), and returns a StreamRead object
 56            that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats.
 57
 58            Parameters:
 59                source (DeclarativeSource): The data source to read from.
 60                config (Mapping[str, Any]): Configuration parameters for the source.
 61                configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration.
 62                state (List[AirbyteStateMessage]): Current state information for the read.
 63                record_limit (Optional[int]): Optional override for the maximum number of records to read.
 64
 65            Returns:
 66                StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata,
 67                along with indicators if any configured limit was reached.
 68
 69    """
 70
 71    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 72
 73    logger = logging.getLogger("airbyte.connector-builder")
 74
 75    def __init__(
 76        self,
 77        max_pages_per_slice: int,
 78        max_slices: int,
 79        max_record_limit: int = 1000,
 80    ) -> None:
 81        self._max_pages_per_slice = max_pages_per_slice
 82        self._max_slices = max_slices
 83        self._max_record_limit = max_record_limit
 84
 85    def run_test_read(
 86        self,
 87        source: DeclarativeSource,
 88        config: Mapping[str, Any],
 89        configured_catalog: ConfiguredAirbyteCatalog,
 90        state: List[AirbyteStateMessage],
 91        record_limit: Optional[int] = None,
 92    ) -> StreamRead:
 93        """
 94        Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.
 95
 96        Parameters:
 97            source (DeclarativeSource): The source instance providing the streams.
 98            config (Mapping[str, Any]): The configuration settings to use for reading.
 99            configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
100            state (List[AirbyteStateMessage]): A list of state messages to resume the read.
101            record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
102
103        Returns:
104            StreamRead: An object containing the following attributes:
105                - logs (List[str]): Log messages generated during the process.
106                - slices (List[Any]): The data slices read from the stream.
107                - test_read_limit_reached (bool): Indicates whether the record limit was reached.
108                - auxiliary_requests (Any): Any auxiliary requests generated during reading.
109                - inferred_schema (Any): The schema inferred from the stream data.
110                - latest_config_update (Any): The latest configuration update, if applicable.
111                - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.
112        """
113
114        record_limit = self._check_record_limit(record_limit)
115        # The connector builder currently only supports reading from a single stream at a time
116        stream = source.streams(config)[0]
117
118        # get any deprecation warnings during the component creation
119        deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
120
121        schema_inferrer = SchemaInferrer(
122            self._pk_to_nested_and_composite_field(stream.primary_key),
123            self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
124        )
125        datetime_format_inferrer = DatetimeFormatInferrer()
126
127        message_group = get_message_groups(
128            self._read_stream(source, config, configured_catalog, state),
129            schema_inferrer,
130            datetime_format_inferrer,
131            record_limit,
132        )
133
134        slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
135            message_group
136        )
137
138        # extend log messages with deprecation warnings
139        log_messages.extend(deprecation_warnings)
140
141        schema, log_messages = self._get_infered_schema(
142            configured_catalog, schema_inferrer, log_messages
143        )
144
145        return StreamRead(
146            logs=log_messages,
147            slices=slices,
148            test_read_limit_reached=self._has_reached_limit(slices),
149            auxiliary_requests=auxiliary_requests,
150            inferred_schema=schema,
151            latest_config_update=self._get_latest_config_update(latest_config_update),
152            inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(),
153        )
154
155    def _pk_to_nested_and_composite_field(
156        self, field: Optional[Union[str, List[str], List[List[str]]]]
157    ) -> List[List[str]]:
158        """
159        Converts a primary key definition into a nested list representation.
160
161        The function accepts a primary key that can be a single string, a list of strings, or a list of lists of strings.
162        It ensures that the return value is always a list of lists of strings.
163
164        Parameters:
165            field (Optional[Union[str, List[str], List[List[str]]]]):
166                The primary key definition. This can be:
167                  - None or an empty value: returns a list containing an empty list.
168                  - A single string: returns a list containing one list with the string.
169                  - A list of strings (composite key): returns a list where each key is encapsulated in its own list.
170                  - A list of lists of strings (nested field structure): returns as is.
171
172        Returns:
173            List[List[str]]:
174                A nested list representation of the primary key.
175        """
176        if not field:
177            return [[]]
178
179        if isinstance(field, str):
180            return [[field]]
181
182        is_composite_key = isinstance(field[0], str)
183        if is_composite_key:
184            return [[i] for i in field]  # type: ignore  # the type of field is expected to be List[str] here
185
186        return field  # type: ignore  # the type of field is expected to be List[List[str]] here
187
188    def _cursor_field_to_nested_and_composite_field(
189        self, field: Union[str, List[str]]
190    ) -> List[List[str]]:
191        """
192        Transforms the cursor field input into a nested list format suitable for further processing.
193
194        This function accepts a cursor field specification, which can be either:
195            - A falsy value (e.g., None or an empty string), in which case it returns a list containing an empty list.
196            - A string representing a simple cursor field. The string is wrapped in a nested list.
197            - A list of strings representing a composite or nested cursor field. The list is returned wrapped in an outer list.
198
199        Parameters:
200            field (Union[str, List[str]]): The cursor field specification. It can be:
201                - An empty or falsy value: returns [[]].
202                - A string: returns [[field]].
203                - A list of strings: returns [field] if the first element is a string.
204
205        Returns:
206            List[List[str]]: A nested list representation of the cursor field.
207
208        Raises:
209            ValueError: If the input is a list but its first element is not a string,
210                        indicating an unsupported type for a cursor field.
211        """
212        if not field:
213            return [[]]
214
215        if isinstance(field, str):
216            return [[field]]
217
218        is_nested_key = isinstance(field[0], str)
219        if is_nested_key:
220            return [field]
221
222        raise ValueError(f"Unknown type for cursor field `{field}")
223
224    def _check_record_limit(self, record_limit: Optional[int] = None) -> int:
225        """
226        Checks and adjusts the provided record limit to ensure it falls within the valid range.
227
228        If record_limit is provided, it must be between 1 and self._max_record_limit inclusive.
229        If record_limit is None, it defaults to self._max_record_limit.
230
231        Args:
232            record_limit (Optional[int]): The requested record limit to validate.
233
234        Returns:
235            int: The validated record limit. If record_limit exceeds self._max_record_limit, the maximum allowed value is used.
236
237        Raises:
238            ValueError: If record_limit is provided and is not between 1 and self._max_record_limit.
239        """
240        if record_limit is not None and not (1 <= record_limit <= self._max_record_limit):
241            raise ValueError(
242                f"Record limit must be between 1 and {self._max_record_limit}. Got {record_limit}"
243            )
244
245        if record_limit is None:
246            record_limit = self._max_record_limit
247        else:
248            record_limit = min(record_limit, self._max_record_limit)
249
250        return record_limit
251
252    def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES:
253        """
254        Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update.
255
256        This function iterates over each message group in the provided collection and processes it based on its type:
257            - AirbyteLogMessage: Converts the log message into a LogMessage object and appends it to the log_messages list.
258            - AirbyteTraceMessage with type ERROR: Extracts error details, creates a LogMessage at the "ERROR" level, and appends it.
259            - AirbyteControlMessage: Updates the latest_config_update if the current message is more recent.
260            - AuxiliaryRequest: Appends the message to the auxiliary_requests list.
261            - StreamReadSlices: Appends the message to the slices list.
262            - Any other type: Raises a ValueError indicating an unknown message group type.
263
264        Parameters:
265            message_groups (MESSAGE_GROUPS): A collection of message groups to be processed.
266
267        Returns:
268            GROUPED_MESSAGES: A tuple containing four elements:
269                - slices: A list of StreamReadSlices objects.
270                - log_messages: A list of LogMessage objects.
271                - auxiliary_requests: A list of AuxiliaryRequest objects.
272                - latest_config_update: The most recent AirbyteControlMessage, or None if none was processed.
273
274        Raises:
275            ValueError: If a message group of an unknown type is encountered.
276        """
277
278        slices = []
279        log_messages = []
280        auxiliary_requests = []
281        latest_config_update: Optional[AirbyteControlMessage] = None
282
283        # process the message groups first
284        for message_group in message_groups:
285            match message_group:
286                case AirbyteLogMessage():
287                    log_messages.append(
288                        LogMessage(message=message_group.message, level=message_group.level.value)
289                    )
290                case AirbyteTraceMessage():
291                    if message_group.type == TraceType.ERROR:
292                        log_messages.append(
293                            LogMessage(
294                                message=message_group.error.message,  # type: ignore
295                                level="ERROR",
296                                internal_message=message_group.error.internal_message,  # type: ignore
297                                stacktrace=message_group.error.stack_trace,  # type: ignore
298                            )
299                        )
300                case AirbyteControlMessage():
301                    if (
302                        not latest_config_update
303                        or latest_config_update.emitted_at <= message_group.emitted_at
304                    ):
305                        latest_config_update = message_group
306                case AuxiliaryRequest():
307                    auxiliary_requests.append(message_group)
308                case StreamReadSlices():
309                    slices.append(message_group)
310                case _:
311                    raise ValueError(f"Unknown message group type: {type(message_group)}")
312
313        return slices, log_messages, auxiliary_requests, latest_config_update
314
315    def _get_infered_schema(
316        self,
317        configured_catalog: ConfiguredAirbyteCatalog,
318        schema_inferrer: SchemaInferrer,
319        log_messages: List[LogMessage],
320    ) -> INFERRED_SCHEMA_OUTPUT_TYPE:
321        """
322        Retrieves the inferred schema from the given configured catalog using the provided schema inferrer.
323
324        This function processes a single stream from the configured catalog. It attempts to obtain the stream's
325        schema via the schema inferrer. If a SchemaValidationException occurs, each validation error is logged in the
326        provided log_messages list and the partially inferred schema (from the exception) is returned.
327
328        Parameters:
329            configured_catalog (ConfiguredAirbyteCatalog): The configured catalog that contains the stream metadata.
330                It is assumed that only one stream is defined.
331            schema_inferrer (SchemaInferrer): An instance responsible for inferring the schema for a given stream.
332            log_messages (List[LogMessage]): A list that will be appended with log messages, especially error messages
333                if schema validation issues arise.
334
335        Returns:
336            INFERRED_SCHEMA_OUTPUT_TYPE: A tuple consisting of the inferred schema and the updated list of log messages.
337        """
338
339        try:
340            # The connector builder currently only supports reading from a single stream at a time
341            configured_stream = configured_catalog.streams[0]
342            schema = schema_inferrer.get_stream_schema(configured_stream.stream.name)
343        except SchemaValidationException as exception:
344            # we update the log_messages with possible errors
345            for validation_error in exception.validation_errors:
346                log_messages.append(LogMessage(validation_error, "ERROR"))
347            schema = exception.schema
348
349        return schema, log_messages
350
351    def _get_latest_config_update(
352        self,
353        latest_config_update: AirbyteControlMessage | None,
354    ) -> Dict[str, Any] | None:
355        """
356        Retrieves a cleaned configuration from the latest Airbyte control message.
357
358        This helper function extracts the configuration from the given Airbyte control message, cleans it using the internal `Parsers.clean_config` function,
359        and returns the resulting dictionary. If no control message is provided (i.e., latest_config_update is None), the function returns None.
360
361        Parameters:
362            latest_config_update (AirbyteControlMessage | None): The control message containing the connector configuration. May be None.
363
364        Returns:
365            Dict[str, Any] | None: The cleaned configuration dictionary if available; otherwise, None.
366        """
367
368        return (
369            clean_config(latest_config_update.connectorConfig.config)  # type: ignore
370            if latest_config_update
371            else None
372        )
373
374    def _read_stream(
375        self,
376        source: DeclarativeSource,
377        config: Mapping[str, Any],
378        configured_catalog: ConfiguredAirbyteCatalog,
379        state: List[AirbyteStateMessage],
380    ) -> Iterator[AirbyteMessage]:
381        """
382        Reads messages from the given DeclarativeSource using an AirbyteEntrypoint.
383
384        This method attempts to yield messages from the source's read generator. If the generator
385        raises an AirbyteTracedException, it checks whether the exception message indicates a non-actionable
386        error (e.g., a final exception from AbstractSource that should not be logged). In that case, it stops
387        processing without yielding the exception as a message. For other exceptions, the exception is caught,
388        wrapped into an AirbyteTracedException, and yielded as an AirbyteMessage.
389
390        Parameters:
391            source (DeclarativeSource): The source object that provides data reading logic.
392            config (Mapping[str, Any]): The configuration dictionary for the source.
393            configured_catalog (ConfiguredAirbyteCatalog): The catalog defining the streams and their configurations.
394            state (List[AirbyteStateMessage]): A list representing the current state for incremental sync.
395
396        Yields:
397            AirbyteMessage: Messages yielded from the source's generator. In case of exceptions,
398            an AirbyteMessage encapsulating the error is yielded instead.
399        """
400        # the generator can raise an exception
401        # iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage
402        try:
403            yield from AirbyteEntrypoint(source).read(
404                source.spec(self.logger), config, configured_catalog, state
405            )
406        except AirbyteTracedException as traced_exception:
407            # Look for this message which indicates that it is the "final exception" raised by AbstractSource.
408            # If it matches, don't yield this as we don't need to show this in the Builder.
409            # This is somewhat brittle as it relies on the message string, but if they drift then the worst case
410            # is that this message will be shown in the Builder.
411            if (
412                traced_exception.message is not None
413                and "During the sync, the following streams did not sync successfully"
414                in traced_exception.message
415            ):
416                return
417            yield traced_exception.as_airbyte_message()
418        except Exception as e:
419            error_message = f"{e.args[0] if len(e.args) > 0 else str(e)}"
420            yield AirbyteTracedException.from_exception(
421                e, message=error_message
422            ).as_airbyte_message()
423
424    def _has_reached_limit(self, slices: List[StreamReadSlices]) -> bool:
425        """
426        Determines whether the provided collection of slices has reached any defined limits.
427
428        This function checks for three types of limits:
429        1. If the number of slices is greater than or equal to a maximum slice limit.
430        2. If any individual slice has a number of pages that meets or exceeds a maximum number of pages per slice.
431        3. If the cumulative number of records across all pages in all slices reaches or exceeds a maximum record limit.
432
433        Parameters:
434            slices (List[StreamReadSlices]): A list where each element represents a slice containing one or more pages, and each page has a collection of records.
435
436        Returns:
437            bool: True if any of the following conditions is met:
438                - The number of slices is at or above the maximum allowed slices.
439                - Any slice contains pages at or above the maximum allowed per slice.
440                - The total count of records reaches or exceeds the maximum record limit.
441            False otherwise.
442        """
443        if len(slices) >= self._max_slices:
444            return True
445
446        record_count = 0
447
448        for _slice in slices:
449            if len(_slice.pages) >= self._max_pages_per_slice:
450                return True
451            for page in _slice.pages:
452                record_count += len(page.records)
453                if record_count >= self._max_record_limit:
454                    return True
455        return False

A utility class for performing test reads from a declarative data source, primarily used to validate connector configurations by performing partial stream reads.

Initialization:

TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000) Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records per read operation.

Public Methods:

run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead:

Executes a test read operation from the given declarative source. It configures and infers the schema,
processes the read messages (including logging and error handling), and returns a StreamRead object
that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats.

Parameters:
    source (DeclarativeSource): The data source to read from.
    config (Mapping[str, Any]): Configuration parameters for the source.
    configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration.
    state (List[AirbyteStateMessage]): Current state information for the read.
    record_limit (Optional[int]): Optional override for the maximum number of records to read.

Returns:
    StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata,
    along with indicators if any configured limit was reached.
TestReader( max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000)
75    def __init__(
76        self,
77        max_pages_per_slice: int,
78        max_slices: int,
79        max_record_limit: int = 1000,
80    ) -> None:
81        self._max_pages_per_slice = max_pages_per_slice
82        self._max_slices = max_slices
83        self._max_record_limit = max_record_limit
logger = <Logger airbyte.connector-builder (INFO)>
def run_test_read( self, source: airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource, config: Mapping[str, Any], configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], record_limit: Optional[int] = None) -> airbyte_cdk.connector_builder.models.StreamRead:
 85    def run_test_read(
 86        self,
 87        source: DeclarativeSource,
 88        config: Mapping[str, Any],
 89        configured_catalog: ConfiguredAirbyteCatalog,
 90        state: List[AirbyteStateMessage],
 91        record_limit: Optional[int] = None,
 92    ) -> StreamRead:
 93        """
 94        Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.
 95
 96        Parameters:
 97            source (DeclarativeSource): The source instance providing the streams.
 98            config (Mapping[str, Any]): The configuration settings to use for reading.
 99            configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
100            state (List[AirbyteStateMessage]): A list of state messages to resume the read.
101            record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
102
103        Returns:
104            StreamRead: An object containing the following attributes:
105                - logs (List[str]): Log messages generated during the process.
106                - slices (List[Any]): The data slices read from the stream.
107                - test_read_limit_reached (bool): Indicates whether the record limit was reached.
108                - auxiliary_requests (Any): Any auxiliary requests generated during reading.
109                - inferred_schema (Any): The schema inferred from the stream data.
110                - latest_config_update (Any): The latest configuration update, if applicable.
111                - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.
112        """
113
114        record_limit = self._check_record_limit(record_limit)
115        # The connector builder currently only supports reading from a single stream at a time
116        stream = source.streams(config)[0]
117
118        # get any deprecation warnings during the component creation
119        deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
120
121        schema_inferrer = SchemaInferrer(
122            self._pk_to_nested_and_composite_field(stream.primary_key),
123            self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
124        )
125        datetime_format_inferrer = DatetimeFormatInferrer()
126
127        message_group = get_message_groups(
128            self._read_stream(source, config, configured_catalog, state),
129            schema_inferrer,
130            datetime_format_inferrer,
131            record_limit,
132        )
133
134        slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
135            message_group
136        )
137
138        # extend log messages with deprecation warnings
139        log_messages.extend(deprecation_warnings)
140
141        schema, log_messages = self._get_infered_schema(
142            configured_catalog, schema_inferrer, log_messages
143        )
144
145        return StreamRead(
146            logs=log_messages,
147            slices=slices,
148            test_read_limit_reached=self._has_reached_limit(slices),
149            auxiliary_requests=auxiliary_requests,
150            inferred_schema=schema,
151            latest_config_update=self._get_latest_config_update(latest_config_update),
152            inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(),
153        )

Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.

Arguments:
  • source (DeclarativeSource): The source instance providing the streams.
  • config (Mapping[str, Any]): The configuration settings to use for reading.
  • configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
  • state (List[AirbyteStateMessage]): A list of state messages to resume the read.
  • record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
Returns:

StreamRead: An object containing the following attributes: - logs (List[str]): Log messages generated during the process. - slices (List[Any]): The data slices read from the stream. - test_read_limit_reached (bool): Indicates whether the record limit was reached. - auxiliary_requests (Any): Any auxiliary requests generated during reading. - inferred_schema (Any): The schema inferred from the stream data. - latest_config_update (Any): The latest configuration update, if applicable. - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.