airbyte_cdk.test.entrypoint_wrapper

The AirbyteEntrypoint is important because it is a service layer that orchestrate how we execute commands from the common interface through the source Python implementation. There is some logic about which message we send to the platform and when which is relevant for integration testing. Other than that, there are integrations point that are annoying to integrate with using Python code:

  • Sources communicate with the platform using stdout. The implication is that the source could just print every message instead of returning things to source. or to using the message repository. WARNING: As part of integration testing, we will not support messages that are simply printed. The reason is that capturing stdout relies on overriding sys.stdout (see https://docs.python.org/3/library/contextlib.html#contextlib.redirect_stdout) which clashes with how pytest captures logs and brings considerations for multithreaded applications. If code you work with uses print statements, please migrate to source.message_repository to emit those messages
  • The entrypoint interface relies on file being written on the file system
  1# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  2
  3"""
  4The AirbyteEntrypoint is important because it is a service layer that orchestrate how we execute commands from the
  5[common interface](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#common-interface) through the source Python
  6implementation. There is some logic about which message we send to the platform and when which is relevant for integration testing. Other
  7than that, there are integrations point that are annoying to integrate with using Python code:
  8* Sources communicate with the platform using stdout. The implication is that the source could just print every message instead of
  9    returning things to source.<method> or to using the message repository. WARNING: As part of integration testing, we will not support
 10    messages that are simply printed. The reason is that capturing stdout relies on overriding sys.stdout (see
 11    https://docs.python.org/3/library/contextlib.html#contextlib.redirect_stdout) which clashes with how pytest captures logs and brings
 12    considerations for multithreaded applications. If code you work with uses `print` statements, please migrate to
 13    source.message_repository to emit those messages
 14* The entrypoint interface relies on file being written on the file system
 15"""
 16
 17import json
 18import logging
 19import re
 20import tempfile
 21import traceback
 22from collections import deque
 23from collections.abc import Generator, Mapping
 24from dataclasses import dataclass
 25from io import StringIO
 26from pathlib import Path
 27from typing import Any, List, Literal, Optional, Union, final, overload
 28
 29import orjson
 30from pydantic import ValidationError as V2ValidationError
 31from serpyco_rs import SchemaValidationError
 32
 33from airbyte_cdk.entrypoint import AirbyteEntrypoint
 34from airbyte_cdk.exception_handler import assemble_uncaught_exception
 35from airbyte_cdk.logger import AirbyteLogFormatter
 36from airbyte_cdk.models import (
 37    AirbyteLogMessage,
 38    AirbyteMessage,
 39    AirbyteMessageSerializer,
 40    AirbyteStateMessage,
 41    AirbyteStateMessageSerializer,
 42    AirbyteStreamState,
 43    AirbyteStreamStatus,
 44    ConfiguredAirbyteCatalog,
 45    ConfiguredAirbyteCatalogSerializer,
 46    Level,
 47    TraceType,
 48    Type,
 49)
 50from airbyte_cdk.sources import Source
 51from airbyte_cdk.test.models.scenario import ExpectedOutcome
 52
 53
 54class AirbyteEntrypointException(Exception):
 55    """Exception raised for errors in the AirbyteEntrypoint execution.
 56
 57    Used to provide details of an Airbyte connector execution failure in the output
 58    captured in an `EntrypointOutput` object. Use `EntrypointOutput.as_exception()` to
 59    convert it to an exception.
 60
 61    Example Usage:
 62        output = EntrypointOutput(...)
 63        if output.errors:
 64            raise output.as_exception()
 65    """
 66
 67    message: str = ""
 68
 69    def __post_init__(self) -> None:
 70        super().__init__(self.message)
 71
 72
 73class EntrypointOutput:
 74    """A class to encapsulate the output of an Airbyte connector's execution.
 75
 76    This class can be initialized with a list of messages or a file containing messages.
 77    It provides methods to access different types of messages produced during the execution
 78    of an Airbyte connector, including both successful messages and error messages.
 79
 80    When working with records and state messages, it provides both a list and an iterator
 81    implementation. Lists are easier to work with, but generators are better suited to handle
 82    large volumes of messages without overflowing the available memory.
 83    """
 84
 85    def __init__(
 86        self,
 87        messages: list[str] | None = None,
 88        uncaught_exception: Optional[BaseException] = None,
 89        *,
 90        command: list[str] | None = None,
 91        message_file: Path | None = None,
 92    ) -> None:
 93        if messages is None and message_file is None:
 94            raise ValueError("Either messages or message_file must be provided")
 95        if messages is not None and message_file is not None:
 96            raise ValueError("Only one of messages or message_file can be provided")
 97
 98        self._command = command
 99        self._messages: list[AirbyteMessage] | None = None
100        self._message_file: Path | None = message_file
101        if messages:
102            try:
103                self._messages = [self._parse_message(message) for message in messages]
104            except V2ValidationError as exception:
105                raise ValueError("All messages are expected to be AirbyteMessage") from exception
106
107        if uncaught_exception:
108            if self._messages is None:
109                self._messages = []
110
111            self._messages.append(
112                assemble_uncaught_exception(
113                    type(uncaught_exception), uncaught_exception
114                ).as_airbyte_message()
115            )
116
117    @staticmethod
118    def _parse_message(message: str) -> AirbyteMessage:
119        try:
120            return AirbyteMessageSerializer.load(orjson.loads(message))
121        except (orjson.JSONDecodeError, SchemaValidationError):
122            # The platform assumes that logs that are not of AirbyteMessage format are log messages
123            return AirbyteMessage(
124                type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=message)
125            )
126
127    @property
128    def records_and_state_messages(
129        self,
130    ) -> list[AirbyteMessage]:
131        return self.get_message_by_types(
132            message_types=[Type.RECORD, Type.STATE],
133            safe_iterator=False,
134        )
135
136    def records_and_state_messages_iterator(
137        self,
138    ) -> Generator[AirbyteMessage, None, None]:
139        """Returns a generator that yields record and state messages one by one.
140
141        Use this instead of `records_and_state_messages` when the volume of messages could be large
142        enough to overload available memory.
143        """
144        return self.get_message_by_types(
145            message_types=[Type.RECORD, Type.STATE],
146            safe_iterator=True,
147        )
148
149    @property
150    def records(self) -> List[AirbyteMessage]:
151        return self.get_message_by_types([Type.RECORD])
152
153    @property
154    def records_iterator(self) -> Generator[AirbyteMessage, None, None]:
155        """Returns a generator that yields record messages one by one.
156
157        Use this instead of `records` when the volume of records could be large
158        enough to overload available memory.
159        """
160        return self.get_message_by_types([Type.RECORD], safe_iterator=True)
161
162    @property
163    def state_messages(self) -> List[AirbyteMessage]:
164        return self.get_message_by_types([Type.STATE])
165
166    @property
167    def spec_messages(self) -> List[AirbyteMessage]:
168        return self.get_message_by_types([Type.SPEC])
169
170    @property
171    def connection_status_messages(self) -> List[AirbyteMessage]:
172        return self.get_message_by_types([Type.CONNECTION_STATUS])
173
174    @property
175    def most_recent_state(self) -> AirbyteStreamState | None:
176        state_message_iterator = self.get_message_by_types(
177            [Type.STATE],
178            safe_iterator=True,
179        )
180        # Use a deque with maxlen=1 to efficiently get the last state message
181        double_ended_queue = deque(state_message_iterator, maxlen=1)
182        try:
183            final_state_message: AirbyteMessage = double_ended_queue.pop()
184        except IndexError:
185            raise ValueError(
186                "Can't provide most recent state as there are no state messages."
187            ) from None
188
189        return final_state_message.state.stream  # type: ignore[union-attr] # state has `stream`
190
191    @property
192    def logs(self) -> List[AirbyteMessage]:
193        return self.get_message_by_types([Type.LOG])
194
195    @property
196    def trace_messages(self) -> List[AirbyteMessage]:
197        return self.get_message_by_types([Type.TRACE])
198
199    @property
200    def analytics_messages(self) -> List[AirbyteMessage]:
201        return self._get_trace_message_by_trace_type(TraceType.ANALYTICS)
202
203    @property
204    def errors(self) -> List[AirbyteMessage]:
205        return self._get_trace_message_by_trace_type(TraceType.ERROR)
206
207    def get_formatted_error_message(self) -> str:
208        """Returns a human-readable error message with the contents.
209
210        If there are no errors, returns an empty string.
211        """
212        errors = self.errors
213        if not errors:
214            # If there are no errors, return an empty string.
215            return ""
216
217        result = "Failed to run airbyte command"
218        result += ": " + " ".join(self._command) if self._command else "."
219        result += "\n" + "\n".join(
220            [str(error.trace.error).replace("\\n", "\n") for error in errors if error.trace],
221        )
222        return result
223
224    def as_exception(self) -> AirbyteEntrypointException:
225        """Convert the output to an exception."""
226        return AirbyteEntrypointException(self.get_formatted_error_message())
227
228    def raise_if_errors(
229        self,
230    ) -> None:
231        """Raise an exception if there are errors in the output.
232
233        Otherwise, do nothing.
234        """
235        if not self.errors:
236            return None
237
238        raise self.as_exception()
239
240    @property
241    def catalog(self) -> AirbyteMessage:
242        catalog = self.get_message_by_types([Type.CATALOG])
243        if len(catalog) != 1:
244            raise ValueError(f"Expected exactly one catalog but got {len(catalog)}")
245        return catalog[0]
246
247    def get_stream_statuses(self, stream_name: str) -> List[AirbyteStreamStatus]:
248        status_messages = map(
249            lambda message: message.trace.stream_status.status,  # type: ignore
250            filter(
251                lambda message: message.trace.stream_status.stream_descriptor.name == stream_name,  # type: ignore # callable; trace has `stream_status`
252                self._get_trace_message_by_trace_type(TraceType.STREAM_STATUS),
253            ),
254        )
255        return list(status_messages)
256
257    def get_message_iterator(self) -> Generator[AirbyteMessage, None, None]:
258        """Creates a generator which yields messages one by one.
259
260        This will iterate over all messages in the output file (if provided) or the messages
261        provided during initialization. File results are provided first, followed by any
262        messages that were passed in directly.
263        """
264        if self._message_file:
265            try:
266                with open(self._message_file, "r", encoding="utf-8") as file:
267                    for line in file:
268                        if not line.strip():
269                            # Skip empty lines
270                            continue
271
272                        yield self._parse_message(line.strip())
273            except FileNotFoundError:
274                raise ValueError(f"Message file {self._message_file} not found")
275
276        if self._messages is not None:
277            yield from self._messages
278
279    # Overloads to provide proper type hints for different usages of `get_message_by_types`.
280
281    @overload
282    def get_message_by_types(
283        self,
284        message_types: list[Type],
285    ) -> list[AirbyteMessage]: ...
286
287    @overload
288    def get_message_by_types(
289        self,
290        message_types: list[Type],
291        *,
292        safe_iterator: Literal[False],
293    ) -> list[AirbyteMessage]: ...
294
295    @overload
296    def get_message_by_types(
297        self,
298        message_types: list[Type],
299        *,
300        safe_iterator: Literal[True],
301    ) -> Generator[AirbyteMessage, None, None]: ...
302
303    def get_message_by_types(
304        self,
305        message_types: list[Type],
306        *,
307        safe_iterator: bool = False,
308    ) -> list[AirbyteMessage] | Generator[AirbyteMessage, None, None]:
309        """Get messages of specific types.
310
311        If `safe_iterator` is True, returns a generator that yields messages one by one.
312        If `safe_iterator` is False, returns a list of messages.
313
314        Use `safe_iterator=True` when the volume of messages could overload the available
315        memory.
316        """
317        message_generator = self.get_message_iterator()
318
319        if safe_iterator:
320            return (message for message in message_generator if message.type in message_types)
321
322        return [message for message in message_generator if message.type in message_types]
323
324    def _get_trace_message_by_trace_type(self, trace_type: TraceType) -> List[AirbyteMessage]:
325        return [
326            message
327            for message in self.get_message_by_types(
328                [Type.TRACE],
329                safe_iterator=True,
330            )
331            if message.trace.type == trace_type  # type: ignore[union-attr] # trace has `type`
332        ]
333
334    def is_in_logs(self, pattern: str) -> bool:
335        """Check if any log message case-insensitive matches the pattern."""
336        return any(
337            re.search(
338                pattern,
339                entry.log.message,  # type: ignore[union-attr] # log has `message`
340                flags=re.IGNORECASE,
341            )
342            for entry in self.logs
343        )
344
345    def is_not_in_logs(self, pattern: str) -> bool:
346        """Check if no log message matches the case-insensitive pattern."""
347        return not self.is_in_logs(pattern)
348
349
350def _run_command(
351    source: Source,
352    args: List[str],
353    expecting_exception: bool | None = None,  # Deprecated, use `expected_outcome` instead.
354    *,
355    expected_outcome: ExpectedOutcome | None = None,
356) -> EntrypointOutput:
357    """Internal function to run a command with the AirbyteEntrypoint.
358
359    Note: Even though this function is private, some connectors do call it directly.
360
361    Note: The `expecting_exception` arg is now deprecated in favor of the tri-state
362    `expected_outcome` arg. The old argument is supported (for now) for backwards compatibility.
363    """
364    expected_outcome = expected_outcome or ExpectedOutcome.from_expecting_exception_bool(
365        expecting_exception,
366    )
367    log_capture_buffer = StringIO()
368    stream_handler = logging.StreamHandler(log_capture_buffer)
369    stream_handler.setLevel(logging.INFO)
370    stream_handler.setFormatter(AirbyteLogFormatter())
371    parent_logger = logging.getLogger("")
372    parent_logger.addHandler(stream_handler)
373
374    parsed_args = AirbyteEntrypoint.parse_args(args)
375
376    source_entrypoint = AirbyteEntrypoint(source)
377    messages: list[str] = []
378    uncaught_exception = None
379    try:
380        for message in source_entrypoint.run(parsed_args):
381            messages.append(message)
382    except Exception as exception:
383        if expected_outcome.expect_success():
384            print("Printing unexpected error from entrypoint_wrapper")
385            print("".join(traceback.format_exception(None, exception, exception.__traceback__)))
386
387        uncaught_exception = exception
388
389    captured_logs = log_capture_buffer.getvalue().split("\n")[:-1]
390
391    parent_logger.removeHandler(stream_handler)
392    return EntrypointOutput(
393        messages=messages + captured_logs,
394        uncaught_exception=uncaught_exception,
395    )
396
397
398def discover(
399    source: Source,
400    config: Mapping[str, Any],
401    expecting_exception: bool | None = None,  # Deprecated, use `expected_outcome` instead.
402    *,
403    expected_outcome: ExpectedOutcome | None = None,
404) -> EntrypointOutput:
405    """
406    config must be json serializable
407    :param expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please
408        provide `expected_outcome=ExpectedOutcome.EXPECT_FAILURE` so that the test output logs are cleaner
409    """
410
411    with tempfile.TemporaryDirectory() as tmp_directory:
412        tmp_directory_path = Path(tmp_directory)
413        config_file = make_file(tmp_directory_path / "config.json", config)
414
415        return _run_command(
416            source,
417            ["discover", "--config", config_file, "--debug"],
418            expecting_exception=expecting_exception,  # Deprecated, but still supported.
419            expected_outcome=expected_outcome,
420        )
421
422
423def read(
424    source: Source,
425    config: Mapping[str, Any],
426    catalog: ConfiguredAirbyteCatalog,
427    state: Optional[List[AirbyteStateMessage]] = None,
428    expecting_exception: bool | None = None,  # Deprecated, use `expected_outcome` instead.
429    *,
430    expected_outcome: ExpectedOutcome | None = None,
431    debug: bool = False,
432) -> EntrypointOutput:
433    """
434    config and state must be json serializable
435
436    :param expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please
437        provide `expected_outcome=ExpectedOutcome.EXPECT_FAILURE` so that the test output logs are cleaner.
438    """
439    with tempfile.TemporaryDirectory() as tmp_directory:
440        tmp_directory_path = Path(tmp_directory)
441        config_file = make_file(tmp_directory_path / "config.json", config)
442        catalog_file = make_file(
443            tmp_directory_path / "catalog.json",
444            orjson.dumps(ConfiguredAirbyteCatalogSerializer.dump(catalog)).decode(),
445        )
446        args = [
447            "read",
448            "--config",
449            config_file,
450            "--catalog",
451            catalog_file,
452        ]
453        if debug:
454            args.append("--debug")
455        if state is not None:
456            args.extend(
457                [
458                    "--state",
459                    make_file(
460                        tmp_directory_path / "state.json",
461                        f"[{','.join([orjson.dumps(AirbyteStateMessageSerializer.dump(stream_state)).decode() for stream_state in state])}]",
462                    ),
463                ]
464            )
465
466        return _run_command(
467            source,
468            args,
469            expecting_exception=expecting_exception,  # Deprecated, but still supported.
470            expected_outcome=expected_outcome,
471        )
472
473
474def make_file(
475    path: Path, file_contents: Optional[Union[str, Mapping[str, Any], List[Mapping[str, Any]]]]
476) -> str:
477    if isinstance(file_contents, str):
478        path.write_text(file_contents)
479    else:
480        path.write_text(json.dumps(file_contents))
481    return str(path)
class AirbyteEntrypointException(builtins.Exception):
55class AirbyteEntrypointException(Exception):
56    """Exception raised for errors in the AirbyteEntrypoint execution.
57
58    Used to provide details of an Airbyte connector execution failure in the output
59    captured in an `EntrypointOutput` object. Use `EntrypointOutput.as_exception()` to
60    convert it to an exception.
61
62    Example Usage:
63        output = EntrypointOutput(...)
64        if output.errors:
65            raise output.as_exception()
66    """
67
68    message: str = ""
69
70    def __post_init__(self) -> None:
71        super().__init__(self.message)

Exception raised for errors in the AirbyteEntrypoint execution.

Used to provide details of an Airbyte connector execution failure in the output captured in an EntrypointOutput object. Use EntrypointOutput.as_exception() to convert it to an exception.

Example Usage:

output = EntrypointOutput(...) if output.errors: raise output.as_exception()

message: str = ''
class EntrypointOutput:
 74class EntrypointOutput:
 75    """A class to encapsulate the output of an Airbyte connector's execution.
 76
 77    This class can be initialized with a list of messages or a file containing messages.
 78    It provides methods to access different types of messages produced during the execution
 79    of an Airbyte connector, including both successful messages and error messages.
 80
 81    When working with records and state messages, it provides both a list and an iterator
 82    implementation. Lists are easier to work with, but generators are better suited to handle
 83    large volumes of messages without overflowing the available memory.
 84    """
 85
 86    def __init__(
 87        self,
 88        messages: list[str] | None = None,
 89        uncaught_exception: Optional[BaseException] = None,
 90        *,
 91        command: list[str] | None = None,
 92        message_file: Path | None = None,
 93    ) -> None:
 94        if messages is None and message_file is None:
 95            raise ValueError("Either messages or message_file must be provided")
 96        if messages is not None and message_file is not None:
 97            raise ValueError("Only one of messages or message_file can be provided")
 98
 99        self._command = command
100        self._messages: list[AirbyteMessage] | None = None
101        self._message_file: Path | None = message_file
102        if messages:
103            try:
104                self._messages = [self._parse_message(message) for message in messages]
105            except V2ValidationError as exception:
106                raise ValueError("All messages are expected to be AirbyteMessage") from exception
107
108        if uncaught_exception:
109            if self._messages is None:
110                self._messages = []
111
112            self._messages.append(
113                assemble_uncaught_exception(
114                    type(uncaught_exception), uncaught_exception
115                ).as_airbyte_message()
116            )
117
118    @staticmethod
119    def _parse_message(message: str) -> AirbyteMessage:
120        try:
121            return AirbyteMessageSerializer.load(orjson.loads(message))
122        except (orjson.JSONDecodeError, SchemaValidationError):
123            # The platform assumes that logs that are not of AirbyteMessage format are log messages
124            return AirbyteMessage(
125                type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=message)
126            )
127
128    @property
129    def records_and_state_messages(
130        self,
131    ) -> list[AirbyteMessage]:
132        return self.get_message_by_types(
133            message_types=[Type.RECORD, Type.STATE],
134            safe_iterator=False,
135        )
136
137    def records_and_state_messages_iterator(
138        self,
139    ) -> Generator[AirbyteMessage, None, None]:
140        """Returns a generator that yields record and state messages one by one.
141
142        Use this instead of `records_and_state_messages` when the volume of messages could be large
143        enough to overload available memory.
144        """
145        return self.get_message_by_types(
146            message_types=[Type.RECORD, Type.STATE],
147            safe_iterator=True,
148        )
149
150    @property
151    def records(self) -> List[AirbyteMessage]:
152        return self.get_message_by_types([Type.RECORD])
153
154    @property
155    def records_iterator(self) -> Generator[AirbyteMessage, None, None]:
156        """Returns a generator that yields record messages one by one.
157
158        Use this instead of `records` when the volume of records could be large
159        enough to overload available memory.
160        """
161        return self.get_message_by_types([Type.RECORD], safe_iterator=True)
162
163    @property
164    def state_messages(self) -> List[AirbyteMessage]:
165        return self.get_message_by_types([Type.STATE])
166
167    @property
168    def spec_messages(self) -> List[AirbyteMessage]:
169        return self.get_message_by_types([Type.SPEC])
170
171    @property
172    def connection_status_messages(self) -> List[AirbyteMessage]:
173        return self.get_message_by_types([Type.CONNECTION_STATUS])
174
175    @property
176    def most_recent_state(self) -> AirbyteStreamState | None:
177        state_message_iterator = self.get_message_by_types(
178            [Type.STATE],
179            safe_iterator=True,
180        )
181        # Use a deque with maxlen=1 to efficiently get the last state message
182        double_ended_queue = deque(state_message_iterator, maxlen=1)
183        try:
184            final_state_message: AirbyteMessage = double_ended_queue.pop()
185        except IndexError:
186            raise ValueError(
187                "Can't provide most recent state as there are no state messages."
188            ) from None
189
190        return final_state_message.state.stream  # type: ignore[union-attr] # state has `stream`
191
192    @property
193    def logs(self) -> List[AirbyteMessage]:
194        return self.get_message_by_types([Type.LOG])
195
196    @property
197    def trace_messages(self) -> List[AirbyteMessage]:
198        return self.get_message_by_types([Type.TRACE])
199
200    @property
201    def analytics_messages(self) -> List[AirbyteMessage]:
202        return self._get_trace_message_by_trace_type(TraceType.ANALYTICS)
203
204    @property
205    def errors(self) -> List[AirbyteMessage]:
206        return self._get_trace_message_by_trace_type(TraceType.ERROR)
207
208    def get_formatted_error_message(self) -> str:
209        """Returns a human-readable error message with the contents.
210
211        If there are no errors, returns an empty string.
212        """
213        errors = self.errors
214        if not errors:
215            # If there are no errors, return an empty string.
216            return ""
217
218        result = "Failed to run airbyte command"
219        result += ": " + " ".join(self._command) if self._command else "."
220        result += "\n" + "\n".join(
221            [str(error.trace.error).replace("\\n", "\n") for error in errors if error.trace],
222        )
223        return result
224
225    def as_exception(self) -> AirbyteEntrypointException:
226        """Convert the output to an exception."""
227        return AirbyteEntrypointException(self.get_formatted_error_message())
228
229    def raise_if_errors(
230        self,
231    ) -> None:
232        """Raise an exception if there are errors in the output.
233
234        Otherwise, do nothing.
235        """
236        if not self.errors:
237            return None
238
239        raise self.as_exception()
240
241    @property
242    def catalog(self) -> AirbyteMessage:
243        catalog = self.get_message_by_types([Type.CATALOG])
244        if len(catalog) != 1:
245            raise ValueError(f"Expected exactly one catalog but got {len(catalog)}")
246        return catalog[0]
247
248    def get_stream_statuses(self, stream_name: str) -> List[AirbyteStreamStatus]:
249        status_messages = map(
250            lambda message: message.trace.stream_status.status,  # type: ignore
251            filter(
252                lambda message: message.trace.stream_status.stream_descriptor.name == stream_name,  # type: ignore # callable; trace has `stream_status`
253                self._get_trace_message_by_trace_type(TraceType.STREAM_STATUS),
254            ),
255        )
256        return list(status_messages)
257
258    def get_message_iterator(self) -> Generator[AirbyteMessage, None, None]:
259        """Creates a generator which yields messages one by one.
260
261        This will iterate over all messages in the output file (if provided) or the messages
262        provided during initialization. File results are provided first, followed by any
263        messages that were passed in directly.
264        """
265        if self._message_file:
266            try:
267                with open(self._message_file, "r", encoding="utf-8") as file:
268                    for line in file:
269                        if not line.strip():
270                            # Skip empty lines
271                            continue
272
273                        yield self._parse_message(line.strip())
274            except FileNotFoundError:
275                raise ValueError(f"Message file {self._message_file} not found")
276
277        if self._messages is not None:
278            yield from self._messages
279
280    # Overloads to provide proper type hints for different usages of `get_message_by_types`.
281
282    @overload
283    def get_message_by_types(
284        self,
285        message_types: list[Type],
286    ) -> list[AirbyteMessage]: ...
287
288    @overload
289    def get_message_by_types(
290        self,
291        message_types: list[Type],
292        *,
293        safe_iterator: Literal[False],
294    ) -> list[AirbyteMessage]: ...
295
296    @overload
297    def get_message_by_types(
298        self,
299        message_types: list[Type],
300        *,
301        safe_iterator: Literal[True],
302    ) -> Generator[AirbyteMessage, None, None]: ...
303
304    def get_message_by_types(
305        self,
306        message_types: list[Type],
307        *,
308        safe_iterator: bool = False,
309    ) -> list[AirbyteMessage] | Generator[AirbyteMessage, None, None]:
310        """Get messages of specific types.
311
312        If `safe_iterator` is True, returns a generator that yields messages one by one.
313        If `safe_iterator` is False, returns a list of messages.
314
315        Use `safe_iterator=True` when the volume of messages could overload the available
316        memory.
317        """
318        message_generator = self.get_message_iterator()
319
320        if safe_iterator:
321            return (message for message in message_generator if message.type in message_types)
322
323        return [message for message in message_generator if message.type in message_types]
324
325    def _get_trace_message_by_trace_type(self, trace_type: TraceType) -> List[AirbyteMessage]:
326        return [
327            message
328            for message in self.get_message_by_types(
329                [Type.TRACE],
330                safe_iterator=True,
331            )
332            if message.trace.type == trace_type  # type: ignore[union-attr] # trace has `type`
333        ]
334
335    def is_in_logs(self, pattern: str) -> bool:
336        """Check if any log message case-insensitive matches the pattern."""
337        return any(
338            re.search(
339                pattern,
340                entry.log.message,  # type: ignore[union-attr] # log has `message`
341                flags=re.IGNORECASE,
342            )
343            for entry in self.logs
344        )
345
346    def is_not_in_logs(self, pattern: str) -> bool:
347        """Check if no log message matches the case-insensitive pattern."""
348        return not self.is_in_logs(pattern)

A class to encapsulate the output of an Airbyte connector's execution.

This class can be initialized with a list of messages or a file containing messages. It provides methods to access different types of messages produced during the execution of an Airbyte connector, including both successful messages and error messages.

When working with records and state messages, it provides both a list and an iterator implementation. Lists are easier to work with, but generators are better suited to handle large volumes of messages without overflowing the available memory.

EntrypointOutput( messages: list[str] | None = None, uncaught_exception: Optional[BaseException] = None, *, command: list[str] | None = None, message_file: pathlib.Path | None = None)
 86    def __init__(
 87        self,
 88        messages: list[str] | None = None,
 89        uncaught_exception: Optional[BaseException] = None,
 90        *,
 91        command: list[str] | None = None,
 92        message_file: Path | None = None,
 93    ) -> None:
 94        if messages is None and message_file is None:
 95            raise ValueError("Either messages or message_file must be provided")
 96        if messages is not None and message_file is not None:
 97            raise ValueError("Only one of messages or message_file can be provided")
 98
 99        self._command = command
100        self._messages: list[AirbyteMessage] | None = None
101        self._message_file: Path | None = message_file
102        if messages:
103            try:
104                self._messages = [self._parse_message(message) for message in messages]
105            except V2ValidationError as exception:
106                raise ValueError("All messages are expected to be AirbyteMessage") from exception
107
108        if uncaught_exception:
109            if self._messages is None:
110                self._messages = []
111
112            self._messages.append(
113                assemble_uncaught_exception(
114                    type(uncaught_exception), uncaught_exception
115                ).as_airbyte_message()
116            )
records_and_state_messages: list[airbyte_cdk.AirbyteMessage]
128    @property
129    def records_and_state_messages(
130        self,
131    ) -> list[AirbyteMessage]:
132        return self.get_message_by_types(
133            message_types=[Type.RECORD, Type.STATE],
134            safe_iterator=False,
135        )
def records_and_state_messages_iterator( self) -> Generator[airbyte_cdk.AirbyteMessage, None, None]:
137    def records_and_state_messages_iterator(
138        self,
139    ) -> Generator[AirbyteMessage, None, None]:
140        """Returns a generator that yields record and state messages one by one.
141
142        Use this instead of `records_and_state_messages` when the volume of messages could be large
143        enough to overload available memory.
144        """
145        return self.get_message_by_types(
146            message_types=[Type.RECORD, Type.STATE],
147            safe_iterator=True,
148        )

Returns a generator that yields record and state messages one by one.

Use this instead of records_and_state_messages when the volume of messages could be large enough to overload available memory.

records: List[airbyte_cdk.AirbyteMessage]
150    @property
151    def records(self) -> List[AirbyteMessage]:
152        return self.get_message_by_types([Type.RECORD])
records_iterator: Generator[airbyte_cdk.AirbyteMessage, None, None]
154    @property
155    def records_iterator(self) -> Generator[AirbyteMessage, None, None]:
156        """Returns a generator that yields record messages one by one.
157
158        Use this instead of `records` when the volume of records could be large
159        enough to overload available memory.
160        """
161        return self.get_message_by_types([Type.RECORD], safe_iterator=True)

Returns a generator that yields record messages one by one.

Use this instead of records when the volume of records could be large enough to overload available memory.

state_messages: List[airbyte_cdk.AirbyteMessage]
163    @property
164    def state_messages(self) -> List[AirbyteMessage]:
165        return self.get_message_by_types([Type.STATE])
spec_messages: List[airbyte_cdk.AirbyteMessage]
167    @property
168    def spec_messages(self) -> List[AirbyteMessage]:
169        return self.get_message_by_types([Type.SPEC])
connection_status_messages: List[airbyte_cdk.AirbyteMessage]
171    @property
172    def connection_status_messages(self) -> List[AirbyteMessage]:
173        return self.get_message_by_types([Type.CONNECTION_STATUS])
most_recent_state: airbyte_cdk.models.airbyte_protocol.AirbyteStreamState | None
175    @property
176    def most_recent_state(self) -> AirbyteStreamState | None:
177        state_message_iterator = self.get_message_by_types(
178            [Type.STATE],
179            safe_iterator=True,
180        )
181        # Use a deque with maxlen=1 to efficiently get the last state message
182        double_ended_queue = deque(state_message_iterator, maxlen=1)
183        try:
184            final_state_message: AirbyteMessage = double_ended_queue.pop()
185        except IndexError:
186            raise ValueError(
187                "Can't provide most recent state as there are no state messages."
188            ) from None
189
190        return final_state_message.state.stream  # type: ignore[union-attr] # state has `stream`
logs: List[airbyte_cdk.AirbyteMessage]
192    @property
193    def logs(self) -> List[AirbyteMessage]:
194        return self.get_message_by_types([Type.LOG])
trace_messages: List[airbyte_cdk.AirbyteMessage]
196    @property
197    def trace_messages(self) -> List[AirbyteMessage]:
198        return self.get_message_by_types([Type.TRACE])
analytics_messages: List[airbyte_cdk.AirbyteMessage]
200    @property
201    def analytics_messages(self) -> List[AirbyteMessage]:
202        return self._get_trace_message_by_trace_type(TraceType.ANALYTICS)
errors: List[airbyte_cdk.AirbyteMessage]
204    @property
205    def errors(self) -> List[AirbyteMessage]:
206        return self._get_trace_message_by_trace_type(TraceType.ERROR)
def get_formatted_error_message(self) -> str:
208    def get_formatted_error_message(self) -> str:
209        """Returns a human-readable error message with the contents.
210
211        If there are no errors, returns an empty string.
212        """
213        errors = self.errors
214        if not errors:
215            # If there are no errors, return an empty string.
216            return ""
217
218        result = "Failed to run airbyte command"
219        result += ": " + " ".join(self._command) if self._command else "."
220        result += "\n" + "\n".join(
221            [str(error.trace.error).replace("\\n", "\n") for error in errors if error.trace],
222        )
223        return result

Returns a human-readable error message with the contents.

If there are no errors, returns an empty string.

def as_exception(self) -> AirbyteEntrypointException:
225    def as_exception(self) -> AirbyteEntrypointException:
226        """Convert the output to an exception."""
227        return AirbyteEntrypointException(self.get_formatted_error_message())

Convert the output to an exception.

def raise_if_errors(self) -> None:
229    def raise_if_errors(
230        self,
231    ) -> None:
232        """Raise an exception if there are errors in the output.
233
234        Otherwise, do nothing.
235        """
236        if not self.errors:
237            return None
238
239        raise self.as_exception()

Raise an exception if there are errors in the output.

Otherwise, do nothing.

catalog: airbyte_cdk.AirbyteMessage
241    @property
242    def catalog(self) -> AirbyteMessage:
243        catalog = self.get_message_by_types([Type.CATALOG])
244        if len(catalog) != 1:
245            raise ValueError(f"Expected exactly one catalog but got {len(catalog)}")
246        return catalog[0]
def get_stream_statuses( self, stream_name: str) -> List[airbyte_protocol_dataclasses.models.airbyte_protocol.AirbyteStreamStatus]:
248    def get_stream_statuses(self, stream_name: str) -> List[AirbyteStreamStatus]:
249        status_messages = map(
250            lambda message: message.trace.stream_status.status,  # type: ignore
251            filter(
252                lambda message: message.trace.stream_status.stream_descriptor.name == stream_name,  # type: ignore # callable; trace has `stream_status`
253                self._get_trace_message_by_trace_type(TraceType.STREAM_STATUS),
254            ),
255        )
256        return list(status_messages)
def get_message_iterator( self) -> Generator[airbyte_cdk.AirbyteMessage, None, None]:
258    def get_message_iterator(self) -> Generator[AirbyteMessage, None, None]:
259        """Creates a generator which yields messages one by one.
260
261        This will iterate over all messages in the output file (if provided) or the messages
262        provided during initialization. File results are provided first, followed by any
263        messages that were passed in directly.
264        """
265        if self._message_file:
266            try:
267                with open(self._message_file, "r", encoding="utf-8") as file:
268                    for line in file:
269                        if not line.strip():
270                            # Skip empty lines
271                            continue
272
273                        yield self._parse_message(line.strip())
274            except FileNotFoundError:
275                raise ValueError(f"Message file {self._message_file} not found")
276
277        if self._messages is not None:
278            yield from self._messages

Creates a generator which yields messages one by one.

This will iterate over all messages in the output file (if provided) or the messages provided during initialization. File results are provided first, followed by any messages that were passed in directly.

def get_message_by_types( self, message_types: list[airbyte_protocol_dataclasses.models.airbyte_protocol.Type], *, safe_iterator: bool = False) -> list[airbyte_cdk.AirbyteMessage] | Generator[airbyte_cdk.AirbyteMessage, None, None]:
304    def get_message_by_types(
305        self,
306        message_types: list[Type],
307        *,
308        safe_iterator: bool = False,
309    ) -> list[AirbyteMessage] | Generator[AirbyteMessage, None, None]:
310        """Get messages of specific types.
311
312        If `safe_iterator` is True, returns a generator that yields messages one by one.
313        If `safe_iterator` is False, returns a list of messages.
314
315        Use `safe_iterator=True` when the volume of messages could overload the available
316        memory.
317        """
318        message_generator = self.get_message_iterator()
319
320        if safe_iterator:
321            return (message for message in message_generator if message.type in message_types)
322
323        return [message for message in message_generator if message.type in message_types]

Get messages of specific types.

If safe_iterator is True, returns a generator that yields messages one by one. If safe_iterator is False, returns a list of messages.

Use safe_iterator=True when the volume of messages could overload the available memory.

def is_in_logs(self, pattern: str) -> bool:
335    def is_in_logs(self, pattern: str) -> bool:
336        """Check if any log message case-insensitive matches the pattern."""
337        return any(
338            re.search(
339                pattern,
340                entry.log.message,  # type: ignore[union-attr] # log has `message`
341                flags=re.IGNORECASE,
342            )
343            for entry in self.logs
344        )

Check if any log message case-insensitive matches the pattern.

def is_not_in_logs(self, pattern: str) -> bool:
346    def is_not_in_logs(self, pattern: str) -> bool:
347        """Check if no log message matches the case-insensitive pattern."""
348        return not self.is_in_logs(pattern)

Check if no log message matches the case-insensitive pattern.

def discover( source: airbyte_cdk.Source, config: Mapping[str, typing.Any], expecting_exception: bool | None = None, *, expected_outcome: airbyte_cdk.test.models.ExpectedOutcome | None = None) -> EntrypointOutput:
399def discover(
400    source: Source,
401    config: Mapping[str, Any],
402    expecting_exception: bool | None = None,  # Deprecated, use `expected_outcome` instead.
403    *,
404    expected_outcome: ExpectedOutcome | None = None,
405) -> EntrypointOutput:
406    """
407    config must be json serializable
408    :param expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please
409        provide `expected_outcome=ExpectedOutcome.EXPECT_FAILURE` so that the test output logs are cleaner
410    """
411
412    with tempfile.TemporaryDirectory() as tmp_directory:
413        tmp_directory_path = Path(tmp_directory)
414        config_file = make_file(tmp_directory_path / "config.json", config)
415
416        return _run_command(
417            source,
418            ["discover", "--config", config_file, "--debug"],
419            expecting_exception=expecting_exception,  # Deprecated, but still supported.
420            expected_outcome=expected_outcome,
421        )

config must be json serializable

Parameters
  • expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please provide expected_outcome=ExpectedOutcome.EXPECT_FAILURE so that the test output logs are cleaner
def read( source: airbyte_cdk.Source, config: Mapping[str, typing.Any], catalog: airbyte_protocol_dataclasses.models.airbyte_protocol.ConfiguredAirbyteCatalog, state: Optional[List[airbyte_cdk.models.airbyte_protocol.AirbyteStateMessage]] = None, expecting_exception: bool | None = None, *, expected_outcome: airbyte_cdk.test.models.ExpectedOutcome | None = None, debug: bool = False) -> EntrypointOutput:
424def read(
425    source: Source,
426    config: Mapping[str, Any],
427    catalog: ConfiguredAirbyteCatalog,
428    state: Optional[List[AirbyteStateMessage]] = None,
429    expecting_exception: bool | None = None,  # Deprecated, use `expected_outcome` instead.
430    *,
431    expected_outcome: ExpectedOutcome | None = None,
432    debug: bool = False,
433) -> EntrypointOutput:
434    """
435    config and state must be json serializable
436
437    :param expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please
438        provide `expected_outcome=ExpectedOutcome.EXPECT_FAILURE` so that the test output logs are cleaner.
439    """
440    with tempfile.TemporaryDirectory() as tmp_directory:
441        tmp_directory_path = Path(tmp_directory)
442        config_file = make_file(tmp_directory_path / "config.json", config)
443        catalog_file = make_file(
444            tmp_directory_path / "catalog.json",
445            orjson.dumps(ConfiguredAirbyteCatalogSerializer.dump(catalog)).decode(),
446        )
447        args = [
448            "read",
449            "--config",
450            config_file,
451            "--catalog",
452            catalog_file,
453        ]
454        if debug:
455            args.append("--debug")
456        if state is not None:
457            args.extend(
458                [
459                    "--state",
460                    make_file(
461                        tmp_directory_path / "state.json",
462                        f"[{','.join([orjson.dumps(AirbyteStateMessageSerializer.dump(stream_state)).decode() for stream_state in state])}]",
463                    ),
464                ]
465            )
466
467        return _run_command(
468            source,
469            args,
470            expecting_exception=expecting_exception,  # Deprecated, but still supported.
471            expected_outcome=expected_outcome,
472        )

config and state must be json serializable

Parameters
  • expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please provide expected_outcome=ExpectedOutcome.EXPECT_FAILURE so that the test output logs are cleaner.
def make_file( path: pathlib.Path, file_contents: Union[str, Mapping[str, Any], List[Mapping[str, Any]], NoneType]) -> str:
475def make_file(
476    path: Path, file_contents: Optional[Union[str, Mapping[str, Any], List[Mapping[str, Any]]]]
477) -> str:
478    if isinstance(file_contents, str):
479        path.write_text(file_contents)
480    else:
481        path.write_text(json.dumps(file_contents))
482    return str(path)