airbyte_cdk.connector_builder.test_reader

1#
2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3#
4
5from .reader import TestReader
6
7__all__ = ("TestReader",)
class TestReader:
 42class TestReader:
 43    """
 44    A utility class for performing test reads from a declarative data source, primarily used to validate
 45    connector configurations by performing partial stream reads.
 46
 47    Initialization:
 48
 49        TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000)
 50            Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records
 51            per read operation.
 52
 53    Public Methods:
 54        run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead:
 55
 56            Executes a test read operation from the given declarative source. It configures and infers the schema,
 57            processes the read messages (including logging and error handling), and returns a StreamRead object
 58            that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats.
 59
 60            Parameters:
 61                source (ConcurrentDeclarativeSource): The data source to read from.
 62                config (Mapping[str, Any]): Configuration parameters for the source.
 63                configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration.
 64                state (List[AirbyteStateMessage]): Current state information for the read.
 65                record_limit (Optional[int]): Optional override for the maximum number of records to read.
 66
 67            Returns:
 68                StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata,
 69                along with indicators if any configured limit was reached.
 70
 71    """
 72
 73    __test__: ClassVar[bool] = False  # Tell Pytest this is not a Pytest class, despite its name
 74
 75    logger = logging.getLogger("airbyte.connector-builder")
 76
 77    def __init__(
 78        self,
 79        max_pages_per_slice: int,
 80        max_slices: int,
 81        max_record_limit: int = 1000,
 82    ) -> None:
 83        self._max_pages_per_slice = max_pages_per_slice
 84        self._max_slices = max_slices
 85        self._max_record_limit = max_record_limit
 86
 87    def run_test_read(
 88        self,
 89        source: ConcurrentDeclarativeSource,
 90        config: Mapping[str, Any],
 91        configured_catalog: ConfiguredAirbyteCatalog,
 92        stream_name: str,
 93        state: List[AirbyteStateMessage],
 94        record_limit: Optional[int] = None,
 95    ) -> StreamRead:
 96        """
 97        Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.
 98
 99        Parameters:
100            source (ConcurrentDeclarativeSource): The source instance providing the streams.
101            config (Mapping[str, Any]): The configuration settings to use for reading.
102            configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
103            state (List[AirbyteStateMessage]): A list of state messages to resume the read.
104            record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
105
106        Returns:
107            StreamRead: An object containing the following attributes:
108                - logs (List[str]): Log messages generated during the process.
109                - slices (List[Any]): The data slices read from the stream.
110                - test_read_limit_reached (bool): Indicates whether the record limit was reached.
111                - auxiliary_requests (Any): Any auxiliary requests generated during reading.
112                - inferred_schema (Any): The schema inferred from the stream data.
113                - latest_config_update (Any): The latest configuration update, if applicable.
114                - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.
115        """
116
117        record_limit = self._check_record_limit(record_limit)
118        # The connector builder currently only supports reading from a single stream at a time
119        streams = source.streams(config)
120        stream = next((stream for stream in streams if stream.name == stream_name), None)
121
122        # get any deprecation warnings during the component creation
123        deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
124
125        schema_inferrer = SchemaInferrer(
126            self._pk_to_nested_and_composite_field(
127                stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key  # type: ignore  # We are accessing the private property here as the primary key is not exposed. We should either expose it or use `as_airbyte_stream` to retrieve it as this is the "official" way where it is exposed in the Airbyte protocol
128            )
129            if stream
130            else None,
131            self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
132            if stream and stream.cursor_field
133            else None,
134        )
135        datetime_format_inferrer = DatetimeFormatInferrer()
136
137        message_group = get_message_groups(
138            self._read_stream(source, config, configured_catalog, state),
139            schema_inferrer,
140            datetime_format_inferrer,
141            record_limit,
142            stream_name,
143        )
144
145        slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
146            message_group
147        )
148
149        # extend log messages with deprecation warnings
150        log_messages.extend(deprecation_warnings)
151
152        schema, log_messages = self._get_infered_schema(
153            configured_catalog, schema_inferrer, log_messages
154        )
155
156        return StreamRead(
157            logs=log_messages,
158            slices=slices,
159            test_read_limit_reached=self._has_reached_limit(slices),
160            auxiliary_requests=auxiliary_requests,
161            inferred_schema=schema,
162            latest_config_update=self._get_latest_config_update(latest_config_update),
163            inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(),
164        )
165
166    def _pk_to_nested_and_composite_field(
167        self, field: Optional[Union[str, List[str], List[List[str]]]]
168    ) -> List[List[str]]:
169        """
170        Converts a primary key definition into a nested list representation.
171
172        The function accepts a primary key that can be a single string, a list of strings, or a list of lists of strings.
173        It ensures that the return value is always a list of lists of strings.
174
175        Parameters:
176            field (Optional[Union[str, List[str], List[List[str]]]]):
177                The primary key definition. This can be:
178                  - None or an empty value: returns a list containing an empty list.
179                  - A single string: returns a list containing one list with the string.
180                  - A list of strings (composite key): returns a list where each key is encapsulated in its own list.
181                  - A list of lists of strings (nested field structure): returns as is.
182
183        Returns:
184            List[List[str]]:
185                A nested list representation of the primary key.
186        """
187        if not field:
188            return [[]]
189
190        if isinstance(field, str):
191            return [[field]]
192
193        is_composite_key = isinstance(field[0], str)
194        if is_composite_key:
195            return [[i] for i in field]  # type: ignore  # the type of field is expected to be List[str] here
196
197        return field  # type: ignore  # the type of field is expected to be List[List[str]] here
198
199    def _cursor_field_to_nested_and_composite_field(
200        self, field: Union[str, List[str]]
201    ) -> List[List[str]]:
202        """
203        Transforms the cursor field input into a nested list format suitable for further processing.
204
205        This function accepts a cursor field specification, which can be either:
206            - A falsy value (e.g., None or an empty string), in which case it returns a list containing an empty list.
207            - A string representing a simple cursor field. The string is wrapped in a nested list.
208            - A list of strings representing a composite or nested cursor field. The list is returned wrapped in an outer list.
209
210        Parameters:
211            field (Union[str, List[str]]): The cursor field specification. It can be:
212                - An empty or falsy value: returns [[]].
213                - A string: returns [[field]].
214                - A list of strings: returns [field] if the first element is a string.
215
216        Returns:
217            List[List[str]]: A nested list representation of the cursor field.
218
219        Raises:
220            ValueError: If the input is a list but its first element is not a string,
221                        indicating an unsupported type for a cursor field.
222        """
223        if not field:
224            return [[]]
225
226        if isinstance(field, str):
227            return [[field]]
228
229        is_nested_key = isinstance(field[0], str)
230        if is_nested_key:
231            return [field]
232
233        raise ValueError(f"Unknown type for cursor field `{field}")
234
235    def _check_record_limit(self, record_limit: Optional[int] = None) -> int:
236        """
237        Checks and adjusts the provided record limit to ensure it falls within the valid range.
238
239        If record_limit is provided, it must be between 1 and self._max_record_limit inclusive.
240        If record_limit is None, it defaults to self._max_record_limit.
241
242        Args:
243            record_limit (Optional[int]): The requested record limit to validate.
244
245        Returns:
246            int: The validated record limit. If record_limit exceeds self._max_record_limit, the maximum allowed value is used.
247
248        Raises:
249            ValueError: If record_limit is provided and is not between 1 and self._max_record_limit.
250        """
251        if record_limit is not None and not (1 <= record_limit <= self._max_record_limit):
252            raise ValueError(
253                f"Record limit must be between 1 and {self._max_record_limit}. Got {record_limit}"
254            )
255
256        if record_limit is None:
257            record_limit = self._max_record_limit
258        else:
259            record_limit = min(record_limit, self._max_record_limit)
260
261        return record_limit
262
263    def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES:
264        """
265        Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update.
266
267        This function iterates over each message group in the provided collection and processes it based on its type:
268            - AirbyteLogMessage: Converts the log message into a LogMessage object and appends it to the log_messages list.
269            - AirbyteTraceMessage with type ERROR: Extracts error details, creates a LogMessage at the "ERROR" level, and appends it.
270            - AirbyteControlMessage: Updates the latest_config_update if the current message is more recent.
271            - AuxiliaryRequest: Appends the message to the auxiliary_requests list.
272            - StreamReadSlices: Appends the message to the slices list.
273            - Any other type: Raises a ValueError indicating an unknown message group type.
274
275        Parameters:
276            message_groups (MESSAGE_GROUPS): A collection of message groups to be processed.
277
278        Returns:
279            GROUPED_MESSAGES: A tuple containing four elements:
280                - slices: A list of StreamReadSlices objects.
281                - log_messages: A list of LogMessage objects.
282                - auxiliary_requests: A list of AuxiliaryRequest objects.
283                - latest_config_update: The most recent AirbyteControlMessage, or None if none was processed.
284
285        Raises:
286            ValueError: If a message group of an unknown type is encountered.
287        """
288
289        slices = []
290        log_messages = []
291        auxiliary_requests = []
292        latest_config_update: Optional[AirbyteControlMessage] = None
293
294        # process the message groups first
295        for message_group in message_groups:
296            match message_group:
297                case AirbyteLogMessage():
298                    log_messages.append(
299                        LogMessage(message=message_group.message, level=message_group.level.value)
300                    )
301                case AirbyteTraceMessage():
302                    if message_group.type == TraceType.ERROR:
303                        log_messages.append(
304                            LogMessage(
305                                message=message_group.error.message,  # type: ignore
306                                level="ERROR",
307                                internal_message=message_group.error.internal_message,  # type: ignore
308                                stacktrace=message_group.error.stack_trace,  # type: ignore
309                            )
310                        )
311                case AirbyteControlMessage():
312                    if (
313                        not latest_config_update
314                        or latest_config_update.emitted_at <= message_group.emitted_at
315                    ):
316                        latest_config_update = message_group
317                case AuxiliaryRequest():
318                    auxiliary_requests.append(message_group)
319                case StreamReadSlices():
320                    slices.append(message_group)
321                case _:
322                    raise ValueError(f"Unknown message group type: {type(message_group)}")
323
324        return slices, log_messages, auxiliary_requests, latest_config_update
325
326    def _get_infered_schema(
327        self,
328        configured_catalog: ConfiguredAirbyteCatalog,
329        schema_inferrer: SchemaInferrer,
330        log_messages: List[LogMessage],
331    ) -> INFERRED_SCHEMA_OUTPUT_TYPE:
332        """
333        Retrieves the inferred schema from the given configured catalog using the provided schema inferrer.
334
335        This function processes a single stream from the configured catalog. It attempts to obtain the stream's
336        schema via the schema inferrer. If a SchemaValidationException occurs, each validation error is logged in the
337        provided log_messages list and the partially inferred schema (from the exception) is returned.
338
339        Parameters:
340            configured_catalog (ConfiguredAirbyteCatalog): The configured catalog that contains the stream metadata.
341                It is assumed that only one stream is defined.
342            schema_inferrer (SchemaInferrer): An instance responsible for inferring the schema for a given stream.
343            log_messages (List[LogMessage]): A list that will be appended with log messages, especially error messages
344                if schema validation issues arise.
345
346        Returns:
347            INFERRED_SCHEMA_OUTPUT_TYPE: A tuple consisting of the inferred schema and the updated list of log messages.
348        """
349
350        try:
351            # The connector builder currently only supports reading from a single stream at a time
352            configured_stream = configured_catalog.streams[0]
353            schema = schema_inferrer.get_stream_schema(configured_stream.stream.name)
354        except SchemaValidationException as exception:
355            # we update the log_messages with possible errors
356            for validation_error in exception.validation_errors:
357                log_messages.append(LogMessage(validation_error, "ERROR"))
358            schema = exception.schema
359
360        return schema, log_messages
361
362    def _get_latest_config_update(
363        self,
364        latest_config_update: AirbyteControlMessage | None,
365    ) -> Dict[str, Any] | None:
366        """
367        Retrieves a cleaned configuration from the latest Airbyte control message.
368
369        This helper function extracts the configuration from the given Airbyte control message, cleans it using the internal `Parsers.clean_config` function,
370        and returns the resulting dictionary. If no control message is provided (i.e., latest_config_update is None), the function returns None.
371
372        Parameters:
373            latest_config_update (AirbyteControlMessage | None): The control message containing the connector configuration. May be None.
374
375        Returns:
376            Dict[str, Any] | None: The cleaned configuration dictionary if available; otherwise, None.
377        """
378
379        return (
380            clean_config(latest_config_update.connectorConfig.config)  # type: ignore
381            if latest_config_update
382            else None
383        )
384
385    def _read_stream(
386        self,
387        source: ConcurrentDeclarativeSource,
388        config: Mapping[str, Any],
389        configured_catalog: ConfiguredAirbyteCatalog,
390        state: List[AirbyteStateMessage],
391    ) -> Iterator[AirbyteMessage]:
392        """
393        Reads messages from the given ConcurrentDeclarativeSource using an AirbyteEntrypoint.
394
395        This method attempts to yield messages from the source's read generator. If the generator
396        raises an AirbyteTracedException, it checks whether the exception message indicates a non-actionable
397        error (e.g., a final exception from AbstractSource that should not be logged). In that case, it stops
398        processing without yielding the exception as a message. For other exceptions, the exception is caught,
399        wrapped into an AirbyteTracedException, and yielded as an AirbyteMessage.
400
401        Parameters:
402            source (ConcurrentDeclarativeSource): The source object that provides data reading logic.
403            config (Mapping[str, Any]): The configuration dictionary for the source.
404            configured_catalog (ConfiguredAirbyteCatalog): The catalog defining the streams and their configurations.
405            state (List[AirbyteStateMessage]): A list representing the current state for incremental sync.
406
407        Yields:
408            AirbyteMessage: Messages yielded from the source's generator. In case of exceptions,
409            an AirbyteMessage encapsulating the error is yielded instead.
410        """
411        # the generator can raise an exception
412        # iterate over the generated messages. if next raise an exception, catch it and yield it as an AirbyteLogMessage
413        try:
414            yield from AirbyteEntrypoint(source).read(
415                source.spec(self.logger), config, configured_catalog, state
416            )
417        except AirbyteTracedException as traced_exception:
418            # Look for this message which indicates that it is the "final exception" raised by AbstractSource.
419            # If it matches, don't yield this as we don't need to show this in the Builder.
420            # This is somewhat brittle as it relies on the message string, but if they drift then the worst case
421            # is that this message will be shown in the Builder.
422            if (
423                traced_exception.message is not None
424                and "During the sync, the following streams did not sync successfully"
425                in traced_exception.message
426            ):
427                return
428            yield traced_exception.as_airbyte_message()
429        except Exception as e:
430            error_message = f"{e.args[0] if len(e.args) > 0 else str(e)}"
431            yield AirbyteTracedException.from_exception(
432                e, message=error_message
433            ).as_airbyte_message()
434
435    def _has_reached_limit(self, slices: List[StreamReadSlices]) -> bool:
436        """
437        Determines whether the provided collection of slices has reached any defined limits.
438
439        This function checks for three types of limits:
440        1. If the number of slices is greater than or equal to a maximum slice limit.
441        2. If any individual slice has a number of pages that meets or exceeds a maximum number of pages per slice.
442        3. If the cumulative number of records across all pages in all slices reaches or exceeds a maximum record limit.
443
444        Parameters:
445            slices (List[StreamReadSlices]): A list where each element represents a slice containing one or more pages, and each page has a collection of records.
446
447        Returns:
448            bool: True if any of the following conditions is met:
449                - The number of slices is at or above the maximum allowed slices.
450                - Any slice contains pages at or above the maximum allowed per slice.
451                - The total count of records reaches or exceeds the maximum record limit.
452            False otherwise.
453        """
454        if len(slices) >= self._max_slices:
455            return True
456
457        record_count = 0
458
459        for _slice in slices:
460            if len(_slice.pages) >= self._max_pages_per_slice:
461                return True
462            for page in _slice.pages:
463                record_count += len(page.records)
464                if record_count >= self._max_record_limit:
465                    return True
466        return False

A utility class for performing test reads from a declarative data source, primarily used to validate connector configurations by performing partial stream reads.

Initialization:

TestReader(max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000) Initializes a new instance of the TestReader class with limits on pages per slice, slices, and records per read operation.

Public Methods:

run_test_read(source, config, configured_catalog, state, record_limit=None) -> StreamRead:

Executes a test read operation from the given declarative source. It configures and infers the schema,
processes the read messages (including logging and error handling), and returns a StreamRead object
that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats.

Parameters:
    source (ConcurrentDeclarativeSource): The data source to read from.
    config (Mapping[str, Any]): Configuration parameters for the source.
    configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration.
    state (List[AirbyteStateMessage]): Current state information for the read.
    record_limit (Optional[int]): Optional override for the maximum number of records to read.

Returns:
    StreamRead: An object encapsulating logs, data slices, auxiliary requests, and inferred metadata,
    along with indicators if any configured limit was reached.
TestReader( max_pages_per_slice: int, max_slices: int, max_record_limit: int = 1000)
77    def __init__(
78        self,
79        max_pages_per_slice: int,
80        max_slices: int,
81        max_record_limit: int = 1000,
82    ) -> None:
83        self._max_pages_per_slice = max_pages_per_slice
84        self._max_slices = max_slices
85        self._max_record_limit = max_record_limit
logger = <Logger airbyte.connector-builder (INFO)>
def run_test_read( self, source: airbyte_cdk.sources.declarative.concurrent_declarative_source.ConcurrentDeclarativeSource, config: Mapping[str, Any], configured_catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, stream_name: str, state: List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage], record_limit: Optional[int] = None) -> airbyte_cdk.connector_builder.models.StreamRead:
 87    def run_test_read(
 88        self,
 89        source: ConcurrentDeclarativeSource,
 90        config: Mapping[str, Any],
 91        configured_catalog: ConfiguredAirbyteCatalog,
 92        stream_name: str,
 93        state: List[AirbyteStateMessage],
 94        record_limit: Optional[int] = None,
 95    ) -> StreamRead:
 96        """
 97        Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.
 98
 99        Parameters:
100            source (ConcurrentDeclarativeSource): The source instance providing the streams.
101            config (Mapping[str, Any]): The configuration settings to use for reading.
102            configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
103            state (List[AirbyteStateMessage]): A list of state messages to resume the read.
104            record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
105
106        Returns:
107            StreamRead: An object containing the following attributes:
108                - logs (List[str]): Log messages generated during the process.
109                - slices (List[Any]): The data slices read from the stream.
110                - test_read_limit_reached (bool): Indicates whether the record limit was reached.
111                - auxiliary_requests (Any): Any auxiliary requests generated during reading.
112                - inferred_schema (Any): The schema inferred from the stream data.
113                - latest_config_update (Any): The latest configuration update, if applicable.
114                - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.
115        """
116
117        record_limit = self._check_record_limit(record_limit)
118        # The connector builder currently only supports reading from a single stream at a time
119        streams = source.streams(config)
120        stream = next((stream for stream in streams if stream.name == stream_name), None)
121
122        # get any deprecation warnings during the component creation
123        deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
124
125        schema_inferrer = SchemaInferrer(
126            self._pk_to_nested_and_composite_field(
127                stream.primary_key if hasattr(stream, "primary_key") else stream._primary_key  # type: ignore  # We are accessing the private property here as the primary key is not exposed. We should either expose it or use `as_airbyte_stream` to retrieve it as this is the "official" way where it is exposed in the Airbyte protocol
128            )
129            if stream
130            else None,
131            self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
132            if stream and stream.cursor_field
133            else None,
134        )
135        datetime_format_inferrer = DatetimeFormatInferrer()
136
137        message_group = get_message_groups(
138            self._read_stream(source, config, configured_catalog, state),
139            schema_inferrer,
140            datetime_format_inferrer,
141            record_limit,
142            stream_name,
143        )
144
145        slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
146            message_group
147        )
148
149        # extend log messages with deprecation warnings
150        log_messages.extend(deprecation_warnings)
151
152        schema, log_messages = self._get_infered_schema(
153            configured_catalog, schema_inferrer, log_messages
154        )
155
156        return StreamRead(
157            logs=log_messages,
158            slices=slices,
159            test_read_limit_reached=self._has_reached_limit(slices),
160            auxiliary_requests=auxiliary_requests,
161            inferred_schema=schema,
162            latest_config_update=self._get_latest_config_update(latest_config_update),
163            inferred_datetime_formats=datetime_format_inferrer.get_inferred_datetime_formats(),
164        )

Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.

Arguments:
  • source (ConcurrentDeclarativeSource): The source instance providing the streams.
  • config (Mapping[str, Any]): The configuration settings to use for reading.
  • configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
  • state (List[AirbyteStateMessage]): A list of state messages to resume the read.
  • record_limit (Optional[int], optional): Maximum number of records to read. Defaults to None.
Returns:

StreamRead: An object containing the following attributes: - logs (List[str]): Log messages generated during the process. - slices (List[Any]): The data slices read from the stream. - test_read_limit_reached (bool): Indicates whether the record limit was reached. - auxiliary_requests (Any): Any auxiliary requests generated during reading. - inferred_schema (Any): The schema inferred from the stream data. - latest_config_update (Any): The latest configuration update, if applicable. - inferred_datetime_formats (Dict[str, str]): Mapping of fields to their inferred datetime formats.