airbyte_cdk.connector_builder.test_reader

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from .reader import TestReader
6
7__all__ = ("TestReader",)
class TestReader:
 40class TestReader:
 41    """
 42    A utility class for performing test reads from a declarative data source, primarily used to validate
 43    connector configurations by performing partial stream reads.
 44
 45    Initialization:
 46
 47        TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000)
 48            Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records
 49            per read operation.
 50
 51    Public Methods:
 52        run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead:
 53
 54            Executes a test read operation from the given declarative source. It configures and infers the schema,
 55            processes the read messages (including logging and error handling), and returns a StreamRead object
 56            that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats.
 57
 58            Parameters:
 59                source (DeclarativeSource): The data source to read from.
 60                config (Mapping[str, Any]): Configuration parameters for the source.
 61                configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration.
 62                state (List[AirbyteStateMessage]): Current state information for the read.
 63                record_limit (Optional[int]): Optional override for the maximum number of records to read.
 64
 65            Returns:
 66                StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata,
 67                along with indicators if any configured limit was reached.
 68
 69    """
 70
 71    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 72
 73    logger = logging.getLogger("airbyte.connector-builder")
 74
 75    def __init__(
 76        self,
 77        max_pages_per_slice: int,
 78        max_slices: int,
 79        max_record_limit: int = 1000,
 80    ) -> None:
 81        self._max_pages_per_slice = max_pages_per_slice
 82        self._max_slices = max_slices
 83        self._max_record_limit = max_record_limit
 84
 85    def run_test_read(
 86        self,
 87        source: DeclarativeSource,
 88        config: Mapping[str, Any],
 89        configured_catalog: ConfiguredAirbyteCatalog,
 90        stream_name: str,
 91        state: List[AirbyteStateMessage],
 92        record_limit: Optional[int] = None,
 93    ) -> StreamRead:
 94        """
 95        Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.
 96
 97        Parameters:
 98            source (DeclarativeSource): The source instance providing the streams.
 99            config (Mapping[str, Any]): The configuration settings to use for reading.
100            configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
101            state (List[AirbyteStateMessage]): A list of state messages to resume the read.
102            record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
103
104        Returns:
105            StreamRead: An object containing the following attributes:
106                - logs (List[str]): Log messages generated during the process.
107                - slices (List[Any]): The data slices read from the stream.
108                - test_read_limit_reached (bool): Indicates whether the record limit was reached.
109                - auxiliary_requests (Any): Any auxiliary requests generated during reading.
110                - inferred_schema (Any): The schema inferred from the stream data.
111                - latest_config_update (Any): The latest configuration update, if applicable.
112                - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.
113        """
114
115        record_limit = self._check_record_limit(record_limit)
116        # The connector builder currently only supports reading from a single stream at a time
117        streams = source.streams(config)
118        stream = next((stream for stream in streams if stream.name == stream_name), None)
119
120        # get any deprecation warnings during the component creation
121        deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
122
123        schema_inferrer = SchemaInferrer(
124            self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
125            self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
126            if stream
127            else None,
128        )
129        datetime_format_inferrer = DatetimeFormatInferrer()
130
131        message_group = get_message_groups(
132            self._read_stream(source, config, configured_catalog, state),
133            schema_inferrer,
134            datetime_format_inferrer,
135            record_limit,
136            stream_name,
137        )
138
139        slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
140            message_group
141        )
142
143        # extend log messages with deprecation warnings
144        log_messages.extend(deprecation_warnings)
145
146        schema, log_messages = self._get_infered_schema(
147            configured_catalog, schema_inferrer, log_messages
148        )
149
150        return StreamRead(
151            logs=log_messages,
152            slices=slices,
153            test_read_limit_reached=self._has_reached_limit(slices),
154            auxiliary_requests=auxiliary_requests,
155            inferred_schema=schema,
156            latest_config_update=self._get_latest_config_update(latest_config_update),
157            inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(),
158        )
159
160    def _pk_to_nested_and_composite_field(
161        self, field: Optional[Union[str, List[str], List[List[str]]]]
162    ) -> List[List[str]]:
163        """
164        Converts a primary key definition into a nested list representation.
165
166        The function accepts a primary key that can be a single string, a list of strings, or a list of lists of strings.
167        It ensures that the return value is always a list of lists of strings.
168
169        Parameters:
170            field (Optional[Union[str, List[str], List[List[str]]]]):
171                The primary key definition. This can be:
172                  - None or an empty value: returns a list containing an empty list.
173                  - A single string: returns a list containing one list with the string.
174                  - A list of strings (composite key): returns a list where each key is encapsulated in its own list.
175                  - A list of lists of strings (nested field structure): returns as is.
176
177        Returns:
178            List[List[str]]:
179                A nested list representation of the primary key.
180        """
181        if not field:
182            return [[]]
183
184        if isinstance(field, str):
185            return [[field]]
186
187        is_composite_key = isinstance(field[0], str)
188        if is_composite_key:
189            return [[i] for i in field]  # type: ignore  # the type of field is expected to be List[str] here
190
191        return field  # type: ignore  # the type of field is expected to be List[List[str]] here
192
193    def _cursor_field_to_nested_and_composite_field(
194        self, field: Union[str, List[str]]
195    ) -> List[List[str]]:
196        """
197        Transforms the cursor field input into a nested list format suitable for further processing.
198
199        This function accepts a cursor field specification, which can be either:
200            - A falsy value (e.g., None or an empty string), in which case it returns a list containing an empty list.
201            - A string representing a simple cursor field. The string is wrapped in a nested list.
202            - A list of strings representing a composite or nested cursor field. The list is returned wrapped in an outer list.
203
204        Parameters:
205            field (Union[str, List[str]]): The cursor field specification. It can be:
206                - An empty or falsy value: returns [[]].
207                - A string: returns [[field]].
208                - A list of strings: returns [field] if the first element is a string.
209
210        Returns:
211            List[List[str]]: A nested list representation of the cursor field.
212
213        Raises:
214            ValueError: If the input is a list but its first element is not a string,
215                        indicating an unsupported type for a cursor field.
216        """
217        if not field:
218            return [[]]
219
220        if isinstance(field, str):
221            return [[field]]
222
223        is_nested_key = isinstance(field[0], str)
224        if is_nested_key:
225            return [field]
226
227        raise ValueError(f"Unknown type for cursor field `{field}")
228
229    def _check_record_limit(self, record_limit: Optional[int] = None) -> int:
230        """
231        Checks and adjusts the provided record limit to ensure it falls within the valid range.
232
233        If record_limit is provided, it must be between 1 and self._max_record_limit inclusive.
234        If record_limit is None, it defaults to self._max_record_limit.
235
236        Args:
237            record_limit (Optional[int]): The requested record limit to validate.
238
239        Returns:
240            int: The validated record limit. If record_limit exceeds self._max_record_limit, the maximum allowed value is used.
241
242        Raises:
243            ValueError: If record_limit is provided and is not between 1 and self._max_record_limit.
244        """
245        if record_limit is not None and not (1 <= record_limit <= self._max_record_limit):
246            raise ValueError(
247                f"Record limit must be between 1 and {self._max_record_limit}. Got {record_limit}"
248            )
249
250        if record_limit is None:
251            record_limit = self._max_record_limit
252        else:
253            record_limit = min(record_limit, self._max_record_limit)
254
255        return record_limit
256
257    def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES:
258        """
259        Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update.
260
261        This function iterates over each message group in the provided collection and processes it based on its type:
262            - AirbyteLogMessage: Converts the log message into a LogMessage object and appends it to the log_messages list.
263            - AirbyteTraceMessage with type ERROR: Extracts error details, creates a LogMessage at the "ERROR" level, and appends it.
264            - AirbyteControlMessage: Updates the latest_config_update if the current message is more recent.
265            - AuxiliaryRequest: Appends the message to the auxiliary_requests list.
266            - StreamReadSlices: Appends the message to the slices list.
267            - Any other type: Raises a ValueError indicating an unknown message group type.
268
269        Parameters:
270            message_groups (MESSAGE_GROUPS): A collection of message groups to be processed.
271
272        Returns:
273            GROUPED_MESSAGES: A tuple containing four elements:
274                - slices: A list of StreamReadSlices objects.
275                - log_messages: A list of LogMessage objects.
276                - auxiliary_requests: A list of AuxiliaryRequest objects.
277                - latest_config_update: The most recent AirbyteControlMessage, or None if none was processed.
278
279        Raises:
280            ValueError: If a message group of an unknown type is encountered.
281        """
282
283        slices = []
284        log_messages = []
285        auxiliary_requests = []
286        latest_config_update: Optional[AirbyteControlMessage] = None
287
288        # process the message groups first
289        for message_group in message_groups:
290            match message_group:
291                case AirbyteLogMessage():
292                    log_messages.append(
293                        LogMessage(message=message_group.message, level=message_group.level.value)
294                    )
295                case AirbyteTraceMessage():
296                    if message_group.type == TraceType.ERROR:
297                        log_messages.append(
298                            LogMessage(
299                                message=message_group.error.message,  # type: ignore
300                                level="ERROR",
301                                internal_message=message_group.error.internal_message,  # type: ignore
302                                stacktrace=message_group.error.stack_trace,  # type: ignore
303                            )
304                        )
305                case AirbyteControlMessage():
306                    if (
307                        not latest_config_update
308                        or latest_config_update.emitted_at <= message_group.emitted_at
309                    ):
310                        latest_config_update = message_group
311                case AuxiliaryRequest():
312                    auxiliary_requests.append(message_group)
313                case StreamReadSlices():
314                    slices.append(message_group)
315                case _:
316                    raise ValueError(f"Unknown message group type: {type(message_group)}")
317
318        return slices, log_messages, auxiliary_requests, latest_config_update
319
320    def _get_infered_schema(
321        self,
322        configured_catalog: ConfiguredAirbyteCatalog,
323        schema_inferrer: SchemaInferrer,
324        log_messages: List[LogMessage],
325    ) -> INFERRED_SCHEMA_OUTPUT_TYPE:
326        """
327        Retrieves the inferred schema from the given configured catalog using the provided schema inferrer.
328
329        This function processes a single stream from the configured catalog. It attempts to obtain the stream's
330        schema via the schema inferrer. If a SchemaValidationException occurs, each validation error is logged in the
331        provided log_messages list and the partially inferred schema (from the exception) is returned.
332
333        Parameters:
334            configured_catalog (ConfiguredAirbyteCatalog): The configured catalog that contains the stream metadata.
335                It is assumed that only one stream is defined.
336            schema_inferrer (SchemaInferrer): An instance responsible for inferring the schema for a given stream.
337            log_messages (List[LogMessage]): A list that will be appended with log messages, especially error messages
338                if schema validation issues arise.
339
340        Returns:
341            INFERRED_SCHEMA_OUTPUT_TYPE: A tuple consisting of the inferred schema and the updated list of log messages.
342        """
343
344        try:
345            # The connector builder currently only supports reading from a single stream at a time
346            configured_stream = configured_catalog.streams[0]
347            schema = schema_inferrer.get_stream_schema(configured_stream.stream.name)
348        except SchemaValidationException as exception:
349            # we update the log_messages with possible errors
350            for validation_error in exception.validation_errors:
351                log_messages.append(LogMessage(validation_error, "ERROR"))
352            schema = exception.schema
353
354        return schema, log_messages
355
356    def _get_latest_config_update(
357        self,
358        latest_config_update: AirbyteControlMessage | None,
359    ) -> Dict[str, Any] | None:
360        """
361        Retrieves a cleaned configuration from the latest Airbyte control message.
362
363        This helper function extracts the configuration from the given Airbyte control message, cleans it using the internal `Parsers.clean_config` function,
364        and returns the resulting dictionary. If no control message is provided (i.e., latest_config_update is None), the function returns None.
365
366        Parameters:
367            latest_config_update (AirbyteControlMessage | None): The control message containing the connector configuration. May be None.
368
369        Returns:
370            Dict[str, Any] | None: The cleaned configuration dictionary if available; otherwise, None.
371        """
372
373        return (
374            clean_config(latest_config_update.connectorConfig.config)  # type: ignore
375            if latest_config_update
376            else None
377        )
378
379    def _read_stream(
380        self,
381        source: DeclarativeSource,
382        config: Mapping[str, Any],
383        configured_catalog: ConfiguredAirbyteCatalog,
384        state: List[AirbyteStateMessage],
385    ) -> Iterator[AirbyteMessage]:
386        """
387        Reads messages from the given DeclarativeSource using an AirbyteEntrypoint.
388
389        This method attempts to yield messages from the source's read generator. If the generator
390        raises an AirbyteTracedException, it checks whether the exception message indicates a non-actionable
391        error (e.g., a final exception from AbstractSource that should not be logged). In that case, it stops
392        processing without yielding the exception as a message. For other exceptions, the exception is caught,
393        wrapped into an AirbyteTracedException, and yielded as an AirbyteMessage.
394
395        Parameters:
396            source (DeclarativeSource): The source object that provides data reading logic.
397            config (Mapping[str, Any]): The configuration dictionary for the source.
398            configured_catalog (ConfiguredAirbyteCatalog): The catalog defining the streams and their configurations.
399            state (List[AirbyteStateMessage]): A list representing the current state for incremental sync.
400
401        Yields:
402            AirbyteMessage: Messages yielded from the source's generator. In case of exceptions,
403            an AirbyteMessage encapsulating the error is yielded instead.
404        """
405        # the generator can raise an exception
406        # iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage
407        try:
408            yield from AirbyteEntrypoint(source).read(
409                source.spec(self.logger), config, configured_catalog, state
410            )
411        except AirbyteTracedException as traced_exception:
412            # Look for this message which indicates that it is the "final exception" raised by AbstractSource.
413            # If it matches, don't yield this as we don't need to show this in the Builder.
414            # This is somewhat brittle as it relies on the message string, but if they drift then the worst case
415            # is that this message will be shown in the Builder.
416            if (
417                traced_exception.message is not None
418                and "During the sync, the following streams did not sync successfully"
419                in traced_exception.message
420            ):
421                return
422            yield traced_exception.as_airbyte_message()
423        except Exception as e:
424            error_message = f"{e.args[0] if len(e.args) > 0 else str(e)}"
425            yield AirbyteTracedException.from_exception(
426                e, message=error_message
427            ).as_airbyte_message()
428
429    def _has_reached_limit(self, slices: List[StreamReadSlices]) -> bool:
430        """
431        Determines whether the provided collection of slices has reached any defined limits.
432
433        This function checks for three types of limits:
434        1. If the number of slices is greater than or equal to a maximum slice limit.
435        2. If any individual slice has a number of pages that meets or exceeds a maximum number of pages per slice.
436        3. If the cumulative number of records across all pages in all slices reaches or exceeds a maximum record limit.
437
438        Parameters:
439            slices (List[StreamReadSlices]): A list where each element represents a slice containing one or more pages, and each page has a collection of records.
440
441        Returns:
442            bool: True if any of the following conditions is met:
443                - The number of slices is at or above the maximum allowed slices.
444                - Any slice contains pages at or above the maximum allowed per slice.
445                - The total count of records reaches or exceeds the maximum record limit.
446            False otherwise.
447        """
448        if len(slices) >= self._max_slices:
449            return True
450
451        record_count = 0
452
453        for _slice in slices:
454            if len(_slice.pages) >= self._max_pages_per_slice:
455                return True
456            for page in _slice.pages:
457                record_count += len(page.records)
458                if record_count >= self._max_record_limit:
459                    return True
460        return False

A utility class for performing test reads from a declarative data source, primarily used to validate connector configurations by performing partial stream reads.

Initialization:

TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000) Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records per read operation.

Public Methods:

run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead:

Executes a test read operation from the given declarative source. It configures and infers the schema,
processes the read messages (including logging and error handling), and returns a StreamRead object
that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats.

Parameters:
    source (DeclarativeSource): The data source to read from.
    config (Mapping[str, Any]): Configuration parameters for the source.
    configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration.
    state (List[AirbyteStateMessage]): Current state information for the read.
    record_limit (Optional[int]): Optional override for the maximum number of records to read.

Returns:
    StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata,
    along with indicators if any configured limit was reached.
TestReader( max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000)
75    def __init__(
76        self,
77        max_pages_per_slice: int,
78        max_slices: int,
79        max_record_limit: int = 1000,
80    ) -> None:
81        self._max_pages_per_slice = max_pages_per_slice
82        self._max_slices = max_slices
83        self._max_record_limit = max_record_limit
logger = <Logger airbyte.connector-builder (INFO)>
def run_test_read( self, source: airbyte_cdk.sources.declarative.declarative_source.DeclarativeSource, config: Mapping[str, Any], configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, stream_name: str, state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], record_limit: Optional[int] = None) -> airbyte_cdk.connector_builder.models.StreamRead:
 85    def run_test_read(
 86        self,
 87        source: DeclarativeSource,
 88        config: Mapping[str, Any],
 89        configured_catalog: ConfiguredAirbyteCatalog,
 90        stream_name: str,
 91        state: List[AirbyteStateMessage],
 92        record_limit: Optional[int] = None,
 93    ) -> StreamRead:
 94        """
 95        Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.
 96
 97        Parameters:
 98            source (DeclarativeSource): The source instance providing the streams.
 99            config (Mapping[str, Any]): The configuration settings to use for reading.
100            configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
101            state (List[AirbyteStateMessage]): A list of state messages to resume the read.
102            record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
103
104        Returns:
105            StreamRead: An object containing the following attributes:
106                - logs (List[str]): Log messages generated during the process.
107                - slices (List[Any]): The data slices read from the stream.
108                - test_read_limit_reached (bool): Indicates whether the record limit was reached.
109                - auxiliary_requests (Any): Any auxiliary requests generated during reading.
110                - inferred_schema (Any): The schema inferred from the stream data.
111                - latest_config_update (Any): The latest configuration update, if applicable.
112                - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.
113        """
114
115        record_limit = self._check_record_limit(record_limit)
116        # The connector builder currently only supports reading from a single stream at a time
117        streams = source.streams(config)
118        stream = next((stream for stream in streams if stream.name == stream_name), None)
119
120        # get any deprecation warnings during the component creation
121        deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
122
123        schema_inferrer = SchemaInferrer(
124            self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
125            self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
126            if stream
127            else None,
128        )
129        datetime_format_inferrer = DatetimeFormatInferrer()
130
131        message_group = get_message_groups(
132            self._read_stream(source, config, configured_catalog, state),
133            schema_inferrer,
134            datetime_format_inferrer,
135            record_limit,
136            stream_name,
137        )
138
139        slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
140            message_group
141        )
142
143        # extend log messages with deprecation warnings
144        log_messages.extend(deprecation_warnings)
145
146        schema, log_messages = self._get_infered_schema(
147            configured_catalog, schema_inferrer, log_messages
148        )
149
150        return StreamRead(
151            logs=log_messages,
152            slices=slices,
153            test_read_limit_reached=self._has_reached_limit(slices),
154            auxiliary_requests=auxiliary_requests,
155            inferred_schema=schema,
156            latest_config_update=self._get_latest_config_update(latest_config_update),
157            inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(),
158        )

Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.

Arguments:
  • source (DeclarativeSource): The source instance providing the streams.
  • config (Mapping[str, Any]): The configuration settings to use for reading.
  • configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
  • state (List[AirbyteStateMessage]): A list of state messages to resume the read.
  • record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
Returns:

StreamRead: An object containing the following attributes: - logs (List[str]): Log messages generated during the process. - slices (List[Any]): The data slices read from the stream. - test_read_limit_reached (bool): Indicates whether the record limit was reached. - auxiliary_requests (Any): Any auxiliary requests generated during reading. - inferred_schema (Any): The schema inferred from the stream data. - latest_config_update (Any): The latest configuration update, if applicable. - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.