airbyte_cdk.test.entrypoint_wrapper
The AirbyteEntrypoint is important because it is a service layer that orchestrate how we execute commands from the common interface through the source Python implementation. There is some logic about which message we send to the platform and when which is relevant for integration testing. Other than that, there are integrations point that are annoying to integrate with using Python code:
- Sources communicate with the platform using stdout. The implication is that the source could just print every message instead of
returning things to source.
or to using the message repository. WARNING: As part of integration testing, we will not support messages that are simply printed. The reason is that capturing stdout relies on overriding sys.stdout (see https://docs.python.org/3/library/contextlib.html#contextlib.redirect_stdout) which clashes with how pytest captures logs and brings considerations for multithreaded applications. If code you work with uses print
statements, please migrate to source.message_repository to emit those messages - The entrypoint interface relies on file being written on the file system
1# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 2 3""" 4The AirbyteEntrypoint is important because it is a service layer that orchestrate how we execute commands from the 5[common interface](https://docs.airbyte.com/understanding-airbyte/airbyte-protocol#common-interface) through the source Python 6implementation. There is some logic about which message we send to the platform and when which is relevant for integration testing. Other 7than that, there are integrations point that are annoying to integrate with using Python code: 8* Sources communicate with the platform using stdout. The implication is that the source could just print every message instead of 9 returning things to source.<method> or to using the message repository. WARNING: As part of integration testing, we will not support 10 messages that are simply printed. The reason is that capturing stdout relies on overriding sys.stdout (see 11 https://docs.python.org/3/library/contextlib.html#contextlib.redirect_stdout) which clashes with how pytest captures logs and brings 12 considerations for multithreaded applications. If code you work with uses `print` statements, please migrate to 13 source.message_repository to emit those messages 14* The entrypoint interface relies on file being written on the file system 15""" 16 17import json 18import logging 19import re 20import tempfile 21import traceback 22from collections import deque 23from collections.abc import Generator, Mapping 24from dataclasses import dataclass 25from io import StringIO 26from pathlib import Path 27from typing import Any, List, Literal, Optional, Union, final, overload 28 29import orjson 30from pydantic import ValidationError as V2ValidationError 31from serpyco_rs import SchemaValidationError 32 33from airbyte_cdk.entrypoint import AirbyteEntrypoint 34from airbyte_cdk.exception_handler import assemble_uncaught_exception 35from airbyte_cdk.logger import AirbyteLogFormatter 36from airbyte_cdk.models import ( 37 AirbyteLogMessage, 38 AirbyteMessage, 39 AirbyteMessageSerializer, 40 AirbyteStateMessage, 41 AirbyteStateMessageSerializer, 42 AirbyteStreamState, 43 AirbyteStreamStatus, 44 ConfiguredAirbyteCatalog, 45 ConfiguredAirbyteCatalogSerializer, 46 Level, 47 TraceType, 48 Type, 49) 50from airbyte_cdk.sources import Source 51from airbyte_cdk.test.models.scenario import ExpectedOutcome 52 53 54class AirbyteEntrypointException(Exception): 55 """Exception raised for errors in the AirbyteEntrypoint execution. 56 57 Used to provide details of an Airbyte connector execution failure in the output 58 captured in an `EntrypointOutput` object. Use `EntrypointOutput.as_exception()` to 59 convert it to an exception. 60 61 Example Usage: 62 output = EntrypointOutput(...) 63 if output.errors: 64 raise output.as_exception() 65 """ 66 67 message: str = "" 68 69 def __post_init__(self) -> None: 70 super().__init__(self.message) 71 72 73class EntrypointOutput: 74 """A class to encapsulate the output of an Airbyte connector's execution. 75 76 This class can be initialized with a list of messages or a file containing messages. 77 It provides methods to access different types of messages produced during the execution 78 of an Airbyte connector, including both successful messages and error messages. 79 80 When working with records and state messages, it provides both a list and an iterator 81 implementation. Lists are easier to work with, but generators are better suited to handle 82 large volumes of messages without overflowing the available memory. 83 """ 84 85 def __init__( 86 self, 87 messages: list[str] | None = None, 88 uncaught_exception: Optional[BaseException] = None, 89 *, 90 command: list[str] | None = None, 91 message_file: Path | None = None, 92 ) -> None: 93 if messages is None and message_file is None: 94 raise ValueError("Either messages or message_file must be provided") 95 if messages is not None and message_file is not None: 96 raise ValueError("Only one of messages or message_file can be provided") 97 98 self._command = command 99 self._messages: list[AirbyteMessage] | None = None 100 self._message_file: Path | None = message_file 101 if messages: 102 try: 103 self._messages = [self._parse_message(message) for message in messages] 104 except V2ValidationError as exception: 105 raise ValueError("All messages are expected to be AirbyteMessage") from exception 106 107 if uncaught_exception: 108 if self._messages is None: 109 self._messages = [] 110 111 self._messages.append( 112 assemble_uncaught_exception( 113 type(uncaught_exception), uncaught_exception 114 ).as_airbyte_message() 115 ) 116 117 @staticmethod 118 def _parse_message(message: str) -> AirbyteMessage: 119 try: 120 return AirbyteMessageSerializer.load(orjson.loads(message)) 121 except (orjson.JSONDecodeError, SchemaValidationError): 122 # The platform assumes that logs that are not of AirbyteMessage format are log messages 123 return AirbyteMessage( 124 type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=message) 125 ) 126 127 @property 128 def records_and_state_messages( 129 self, 130 ) -> list[AirbyteMessage]: 131 return self.get_message_by_types( 132 message_types=[Type.RECORD, Type.STATE], 133 safe_iterator=False, 134 ) 135 136 def records_and_state_messages_iterator( 137 self, 138 ) -> Generator[AirbyteMessage, None, None]: 139 """Returns a generator that yields record and state messages one by one. 140 141 Use this instead of `records_and_state_messages` when the volume of messages could be large 142 enough to overload available memory. 143 """ 144 return self.get_message_by_types( 145 message_types=[Type.RECORD, Type.STATE], 146 safe_iterator=True, 147 ) 148 149 @property 150 def records(self) -> List[AirbyteMessage]: 151 return self.get_message_by_types([Type.RECORD]) 152 153 @property 154 def records_iterator(self) -> Generator[AirbyteMessage, None, None]: 155 """Returns a generator that yields record messages one by one. 156 157 Use this instead of `records` when the volume of records could be large 158 enough to overload available memory. 159 """ 160 return self.get_message_by_types([Type.RECORD], safe_iterator=True) 161 162 @property 163 def state_messages(self) -> List[AirbyteMessage]: 164 return self.get_message_by_types([Type.STATE]) 165 166 @property 167 def spec_messages(self) -> List[AirbyteMessage]: 168 return self.get_message_by_types([Type.SPEC]) 169 170 @property 171 def connection_status_messages(self) -> List[AirbyteMessage]: 172 return self.get_message_by_types([Type.CONNECTION_STATUS]) 173 174 @property 175 def most_recent_state(self) -> AirbyteStreamState | None: 176 state_message_iterator = self.get_message_by_types( 177 [Type.STATE], 178 safe_iterator=True, 179 ) 180 # Use a deque with maxlen=1 to efficiently get the last state message 181 double_ended_queue = deque(state_message_iterator, maxlen=1) 182 try: 183 final_state_message: AirbyteMessage = double_ended_queue.pop() 184 except IndexError: 185 raise ValueError( 186 "Can't provide most recent state as there are no state messages." 187 ) from None 188 189 return final_state_message.state.stream # type: ignore[union-attr] # state has `stream` 190 191 @property 192 def logs(self) -> List[AirbyteMessage]: 193 return self.get_message_by_types([Type.LOG]) 194 195 @property 196 def trace_messages(self) -> List[AirbyteMessage]: 197 return self.get_message_by_types([Type.TRACE]) 198 199 @property 200 def analytics_messages(self) -> List[AirbyteMessage]: 201 return self._get_trace_message_by_trace_type(TraceType.ANALYTICS) 202 203 @property 204 def errors(self) -> List[AirbyteMessage]: 205 return self._get_trace_message_by_trace_type(TraceType.ERROR) 206 207 def get_formatted_error_message(self) -> str: 208 """Returns a human-readable error message with the contents. 209 210 If there are no errors, returns an empty string. 211 """ 212 errors = self.errors 213 if not errors: 214 # If there are no errors, return an empty string. 215 return "" 216 217 result = "Failed to run airbyte command" 218 result += ": " + " ".join(self._command) if self._command else "." 219 result += "\n" + "\n".join( 220 [str(error.trace.error).replace("\\n", "\n") for error in errors if error.trace], 221 ) 222 return result 223 224 def as_exception(self) -> AirbyteEntrypointException: 225 """Convert the output to an exception.""" 226 return AirbyteEntrypointException(self.get_formatted_error_message()) 227 228 def raise_if_errors( 229 self, 230 ) -> None: 231 """Raise an exception if there are errors in the output. 232 233 Otherwise, do nothing. 234 """ 235 if not self.errors: 236 return None 237 238 raise self.as_exception() 239 240 @property 241 def catalog(self) -> AirbyteMessage: 242 catalog = self.get_message_by_types([Type.CATALOG]) 243 if len(catalog) != 1: 244 raise ValueError(f"Expected exactly one catalog but got {len(catalog)}") 245 return catalog[0] 246 247 def get_stream_statuses(self, stream_name: str) -> List[AirbyteStreamStatus]: 248 status_messages = map( 249 lambda message: message.trace.stream_status.status, # type: ignore 250 filter( 251 lambda message: message.trace.stream_status.stream_descriptor.name == stream_name, # type: ignore # callable; trace has `stream_status` 252 self._get_trace_message_by_trace_type(TraceType.STREAM_STATUS), 253 ), 254 ) 255 return list(status_messages) 256 257 def get_message_iterator(self) -> Generator[AirbyteMessage, None, None]: 258 """Creates a generator which yields messages one by one. 259 260 This will iterate over all messages in the output file (if provided) or the messages 261 provided during initialization. File results are provided first, followed by any 262 messages that were passed in directly. 263 """ 264 if self._message_file: 265 try: 266 with open(self._message_file, "r", encoding="utf-8") as file: 267 for line in file: 268 if not line.strip(): 269 # Skip empty lines 270 continue 271 272 yield self._parse_message(line.strip()) 273 except FileNotFoundError: 274 raise ValueError(f"Message file {self._message_file} not found") 275 276 if self._messages is not None: 277 yield from self._messages 278 279 # Overloads to provide proper type hints for different usages of `get_message_by_types`. 280 281 @overload 282 def get_message_by_types( 283 self, 284 message_types: list[Type], 285 ) -> list[AirbyteMessage]: ... 286 287 @overload 288 def get_message_by_types( 289 self, 290 message_types: list[Type], 291 *, 292 safe_iterator: Literal[False], 293 ) -> list[AirbyteMessage]: ... 294 295 @overload 296 def get_message_by_types( 297 self, 298 message_types: list[Type], 299 *, 300 safe_iterator: Literal[True], 301 ) -> Generator[AirbyteMessage, None, None]: ... 302 303 def get_message_by_types( 304 self, 305 message_types: list[Type], 306 *, 307 safe_iterator: bool = False, 308 ) -> list[AirbyteMessage] | Generator[AirbyteMessage, None, None]: 309 """Get messages of specific types. 310 311 If `safe_iterator` is True, returns a generator that yields messages one by one. 312 If `safe_iterator` is False, returns a list of messages. 313 314 Use `safe_iterator=True` when the volume of messages could overload the available 315 memory. 316 """ 317 message_generator = self.get_message_iterator() 318 319 if safe_iterator: 320 return (message for message in message_generator if message.type in message_types) 321 322 return [message for message in message_generator if message.type in message_types] 323 324 def _get_trace_message_by_trace_type(self, trace_type: TraceType) -> List[AirbyteMessage]: 325 return [ 326 message 327 for message in self.get_message_by_types( 328 [Type.TRACE], 329 safe_iterator=True, 330 ) 331 if message.trace.type == trace_type # type: ignore[union-attr] # trace has `type` 332 ] 333 334 def is_in_logs(self, pattern: str) -> bool: 335 """Check if any log message case-insensitive matches the pattern.""" 336 return any( 337 re.search( 338 pattern, 339 entry.log.message, # type: ignore[union-attr] # log has `message` 340 flags=re.IGNORECASE, 341 ) 342 for entry in self.logs 343 ) 344 345 def is_not_in_logs(self, pattern: str) -> bool: 346 """Check if no log message matches the case-insensitive pattern.""" 347 return not self.is_in_logs(pattern) 348 349 350def _run_command( 351 source: Source, 352 args: List[str], 353 expecting_exception: bool | None = None, # Deprecated, use `expected_outcome` instead. 354 *, 355 expected_outcome: ExpectedOutcome | None = None, 356) -> EntrypointOutput: 357 """Internal function to run a command with the AirbyteEntrypoint. 358 359 Note: Even though this function is private, some connectors do call it directly. 360 361 Note: The `expecting_exception` arg is now deprecated in favor of the tri-state 362 `expected_outcome` arg. The old argument is supported (for now) for backwards compatibility. 363 """ 364 expected_outcome = expected_outcome or ExpectedOutcome.from_expecting_exception_bool( 365 expecting_exception, 366 ) 367 log_capture_buffer = StringIO() 368 stream_handler = logging.StreamHandler(log_capture_buffer) 369 stream_handler.setLevel(logging.INFO) 370 stream_handler.setFormatter(AirbyteLogFormatter()) 371 parent_logger = logging.getLogger("") 372 parent_logger.addHandler(stream_handler) 373 374 parsed_args = AirbyteEntrypoint.parse_args(args) 375 376 source_entrypoint = AirbyteEntrypoint(source) 377 messages: list[str] = [] 378 uncaught_exception = None 379 try: 380 for message in source_entrypoint.run(parsed_args): 381 messages.append(message) 382 except Exception as exception: 383 if expected_outcome.expect_success(): 384 print("Printing unexpected error from entrypoint_wrapper") 385 print("".join(traceback.format_exception(None, exception, exception.__traceback__))) 386 387 uncaught_exception = exception 388 389 captured_logs = log_capture_buffer.getvalue().split("\n")[:-1] 390 391 parent_logger.removeHandler(stream_handler) 392 return EntrypointOutput( 393 messages=messages + captured_logs, 394 uncaught_exception=uncaught_exception, 395 ) 396 397 398def discover( 399 source: Source, 400 config: Mapping[str, Any], 401 expecting_exception: bool | None = None, # Deprecated, use `expected_outcome` instead. 402 *, 403 expected_outcome: ExpectedOutcome | None = None, 404) -> EntrypointOutput: 405 """ 406 config must be json serializable 407 :param expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please 408 provide `expected_outcome=ExpectedOutcome.EXPECT_FAILURE` so that the test output logs are cleaner 409 """ 410 411 with tempfile.TemporaryDirectory() as tmp_directory: 412 tmp_directory_path = Path(tmp_directory) 413 config_file = make_file(tmp_directory_path / "config.json", config) 414 415 return _run_command( 416 source, 417 ["discover", "--config", config_file, "--debug"], 418 expecting_exception=expecting_exception, # Deprecated, but still supported. 419 expected_outcome=expected_outcome, 420 ) 421 422 423def read( 424 source: Source, 425 config: Mapping[str, Any], 426 catalog: ConfiguredAirbyteCatalog, 427 state: Optional[List[AirbyteStateMessage]] = None, 428 expecting_exception: bool | None = None, # Deprecated, use `expected_outcome` instead. 429 *, 430 expected_outcome: ExpectedOutcome | None = None, 431 debug: bool = False, 432) -> EntrypointOutput: 433 """ 434 config and state must be json serializable 435 436 :param expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please 437 provide `expected_outcome=ExpectedOutcome.EXPECT_FAILURE` so that the test output logs are cleaner. 438 """ 439 with tempfile.TemporaryDirectory() as tmp_directory: 440 tmp_directory_path = Path(tmp_directory) 441 config_file = make_file(tmp_directory_path / "config.json", config) 442 catalog_file = make_file( 443 tmp_directory_path / "catalog.json", 444 orjson.dumps(ConfiguredAirbyteCatalogSerializer.dump(catalog)).decode(), 445 ) 446 args = [ 447 "read", 448 "--config", 449 config_file, 450 "--catalog", 451 catalog_file, 452 ] 453 if debug: 454 args.append("--debug") 455 if state is not None: 456 args.extend( 457 [ 458 "--state", 459 make_file( 460 tmp_directory_path / "state.json", 461 f"[{','.join([orjson.dumps(AirbyteStateMessageSerializer.dump(stream_state)).decode() for stream_state in state])}]", 462 ), 463 ] 464 ) 465 466 return _run_command( 467 source, 468 args, 469 expecting_exception=expecting_exception, # Deprecated, but still supported. 470 expected_outcome=expected_outcome, 471 ) 472 473 474def make_file( 475 path: Path, file_contents: Optional[Union[str, Mapping[str, Any], List[Mapping[str, Any]]]] 476) -> str: 477 if isinstance(file_contents, str): 478 path.write_text(file_contents) 479 else: 480 path.write_text(json.dumps(file_contents)) 481 return str(path)
55class AirbyteEntrypointException(Exception): 56 """Exception raised for errors in the AirbyteEntrypoint execution. 57 58 Used to provide details of an Airbyte connector execution failure in the output 59 captured in an `EntrypointOutput` object. Use `EntrypointOutput.as_exception()` to 60 convert it to an exception. 61 62 Example Usage: 63 output = EntrypointOutput(...) 64 if output.errors: 65 raise output.as_exception() 66 """ 67 68 message: str = "" 69 70 def __post_init__(self) -> None: 71 super().__init__(self.message)
Exception raised for errors in the AirbyteEntrypoint execution.
Used to provide details of an Airbyte connector execution failure in the output
captured in an EntrypointOutput
object. Use EntrypointOutput.as_exception()
to
convert it to an exception.
Example Usage:
output = EntrypointOutput(...) if output.errors: raise output.as_exception()
74class EntrypointOutput: 75 """A class to encapsulate the output of an Airbyte connector's execution. 76 77 This class can be initialized with a list of messages or a file containing messages. 78 It provides methods to access different types of messages produced during the execution 79 of an Airbyte connector, including both successful messages and error messages. 80 81 When working with records and state messages, it provides both a list and an iterator 82 implementation. Lists are easier to work with, but generators are better suited to handle 83 large volumes of messages without overflowing the available memory. 84 """ 85 86 def __init__( 87 self, 88 messages: list[str] | None = None, 89 uncaught_exception: Optional[BaseException] = None, 90 *, 91 command: list[str] | None = None, 92 message_file: Path | None = None, 93 ) -> None: 94 if messages is None and message_file is None: 95 raise ValueError("Either messages or message_file must be provided") 96 if messages is not None and message_file is not None: 97 raise ValueError("Only one of messages or message_file can be provided") 98 99 self._command = command 100 self._messages: list[AirbyteMessage] | None = None 101 self._message_file: Path | None = message_file 102 if messages: 103 try: 104 self._messages = [self._parse_message(message) for message in messages] 105 except V2ValidationError as exception: 106 raise ValueError("All messages are expected to be AirbyteMessage") from exception 107 108 if uncaught_exception: 109 if self._messages is None: 110 self._messages = [] 111 112 self._messages.append( 113 assemble_uncaught_exception( 114 type(uncaught_exception), uncaught_exception 115 ).as_airbyte_message() 116 ) 117 118 @staticmethod 119 def _parse_message(message: str) -> AirbyteMessage: 120 try: 121 return AirbyteMessageSerializer.load(orjson.loads(message)) 122 except (orjson.JSONDecodeError, SchemaValidationError): 123 # The platform assumes that logs that are not of AirbyteMessage format are log messages 124 return AirbyteMessage( 125 type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=message) 126 ) 127 128 @property 129 def records_and_state_messages( 130 self, 131 ) -> list[AirbyteMessage]: 132 return self.get_message_by_types( 133 message_types=[Type.RECORD, Type.STATE], 134 safe_iterator=False, 135 ) 136 137 def records_and_state_messages_iterator( 138 self, 139 ) -> Generator[AirbyteMessage, None, None]: 140 """Returns a generator that yields record and state messages one by one. 141 142 Use this instead of `records_and_state_messages` when the volume of messages could be large 143 enough to overload available memory. 144 """ 145 return self.get_message_by_types( 146 message_types=[Type.RECORD, Type.STATE], 147 safe_iterator=True, 148 ) 149 150 @property 151 def records(self) -> List[AirbyteMessage]: 152 return self.get_message_by_types([Type.RECORD]) 153 154 @property 155 def records_iterator(self) -> Generator[AirbyteMessage, None, None]: 156 """Returns a generator that yields record messages one by one. 157 158 Use this instead of `records` when the volume of records could be large 159 enough to overload available memory. 160 """ 161 return self.get_message_by_types([Type.RECORD], safe_iterator=True) 162 163 @property 164 def state_messages(self) -> List[AirbyteMessage]: 165 return self.get_message_by_types([Type.STATE]) 166 167 @property 168 def spec_messages(self) -> List[AirbyteMessage]: 169 return self.get_message_by_types([Type.SPEC]) 170 171 @property 172 def connection_status_messages(self) -> List[AirbyteMessage]: 173 return self.get_message_by_types([Type.CONNECTION_STATUS]) 174 175 @property 176 def most_recent_state(self) -> AirbyteStreamState | None: 177 state_message_iterator = self.get_message_by_types( 178 [Type.STATE], 179 safe_iterator=True, 180 ) 181 # Use a deque with maxlen=1 to efficiently get the last state message 182 double_ended_queue = deque(state_message_iterator, maxlen=1) 183 try: 184 final_state_message: AirbyteMessage = double_ended_queue.pop() 185 except IndexError: 186 raise ValueError( 187 "Can't provide most recent state as there are no state messages." 188 ) from None 189 190 return final_state_message.state.stream # type: ignore[union-attr] # state has `stream` 191 192 @property 193 def logs(self) -> List[AirbyteMessage]: 194 return self.get_message_by_types([Type.LOG]) 195 196 @property 197 def trace_messages(self) -> List[AirbyteMessage]: 198 return self.get_message_by_types([Type.TRACE]) 199 200 @property 201 def analytics_messages(self) -> List[AirbyteMessage]: 202 return self._get_trace_message_by_trace_type(TraceType.ANALYTICS) 203 204 @property 205 def errors(self) -> List[AirbyteMessage]: 206 return self._get_trace_message_by_trace_type(TraceType.ERROR) 207 208 def get_formatted_error_message(self) -> str: 209 """Returns a human-readable error message with the contents. 210 211 If there are no errors, returns an empty string. 212 """ 213 errors = self.errors 214 if not errors: 215 # If there are no errors, return an empty string. 216 return "" 217 218 result = "Failed to run airbyte command" 219 result += ": " + " ".join(self._command) if self._command else "." 220 result += "\n" + "\n".join( 221 [str(error.trace.error).replace("\\n", "\n") for error in errors if error.trace], 222 ) 223 return result 224 225 def as_exception(self) -> AirbyteEntrypointException: 226 """Convert the output to an exception.""" 227 return AirbyteEntrypointException(self.get_formatted_error_message()) 228 229 def raise_if_errors( 230 self, 231 ) -> None: 232 """Raise an exception if there are errors in the output. 233 234 Otherwise, do nothing. 235 """ 236 if not self.errors: 237 return None 238 239 raise self.as_exception() 240 241 @property 242 def catalog(self) -> AirbyteMessage: 243 catalog = self.get_message_by_types([Type.CATALOG]) 244 if len(catalog) != 1: 245 raise ValueError(f"Expected exactly one catalog but got {len(catalog)}") 246 return catalog[0] 247 248 def get_stream_statuses(self, stream_name: str) -> List[AirbyteStreamStatus]: 249 status_messages = map( 250 lambda message: message.trace.stream_status.status, # type: ignore 251 filter( 252 lambda message: message.trace.stream_status.stream_descriptor.name == stream_name, # type: ignore # callable; trace has `stream_status` 253 self._get_trace_message_by_trace_type(TraceType.STREAM_STATUS), 254 ), 255 ) 256 return list(status_messages) 257 258 def get_message_iterator(self) -> Generator[AirbyteMessage, None, None]: 259 """Creates a generator which yields messages one by one. 260 261 This will iterate over all messages in the output file (if provided) or the messages 262 provided during initialization. File results are provided first, followed by any 263 messages that were passed in directly. 264 """ 265 if self._message_file: 266 try: 267 with open(self._message_file, "r", encoding="utf-8") as file: 268 for line in file: 269 if not line.strip(): 270 # Skip empty lines 271 continue 272 273 yield self._parse_message(line.strip()) 274 except FileNotFoundError: 275 raise ValueError(f"Message file {self._message_file} not found") 276 277 if self._messages is not None: 278 yield from self._messages 279 280 # Overloads to provide proper type hints for different usages of `get_message_by_types`. 281 282 @overload 283 def get_message_by_types( 284 self, 285 message_types: list[Type], 286 ) -> list[AirbyteMessage]: ... 287 288 @overload 289 def get_message_by_types( 290 self, 291 message_types: list[Type], 292 *, 293 safe_iterator: Literal[False], 294 ) -> list[AirbyteMessage]: ... 295 296 @overload 297 def get_message_by_types( 298 self, 299 message_types: list[Type], 300 *, 301 safe_iterator: Literal[True], 302 ) -> Generator[AirbyteMessage, None, None]: ... 303 304 def get_message_by_types( 305 self, 306 message_types: list[Type], 307 *, 308 safe_iterator: bool = False, 309 ) -> list[AirbyteMessage] | Generator[AirbyteMessage, None, None]: 310 """Get messages of specific types. 311 312 If `safe_iterator` is True, returns a generator that yields messages one by one. 313 If `safe_iterator` is False, returns a list of messages. 314 315 Use `safe_iterator=True` when the volume of messages could overload the available 316 memory. 317 """ 318 message_generator = self.get_message_iterator() 319 320 if safe_iterator: 321 return (message for message in message_generator if message.type in message_types) 322 323 return [message for message in message_generator if message.type in message_types] 324 325 def _get_trace_message_by_trace_type(self, trace_type: TraceType) -> List[AirbyteMessage]: 326 return [ 327 message 328 for message in self.get_message_by_types( 329 [Type.TRACE], 330 safe_iterator=True, 331 ) 332 if message.trace.type == trace_type # type: ignore[union-attr] # trace has `type` 333 ] 334 335 def is_in_logs(self, pattern: str) -> bool: 336 """Check if any log message case-insensitive matches the pattern.""" 337 return any( 338 re.search( 339 pattern, 340 entry.log.message, # type: ignore[union-attr] # log has `message` 341 flags=re.IGNORECASE, 342 ) 343 for entry in self.logs 344 ) 345 346 def is_not_in_logs(self, pattern: str) -> bool: 347 """Check if no log message matches the case-insensitive pattern.""" 348 return not self.is_in_logs(pattern)
A class to encapsulate the output of an Airbyte connector's execution.
This class can be initialized with a list of messages or a file containing messages. It provides methods to access different types of messages produced during the execution of an Airbyte connector, including both successful messages and error messages.
When working with records and state messages, it provides both a list and an iterator implementation. Lists are easier to work with, but generators are better suited to handle large volumes of messages without overflowing the available memory.
86 def __init__( 87 self, 88 messages: list[str] | None = None, 89 uncaught_exception: Optional[BaseException] = None, 90 *, 91 command: list[str] | None = None, 92 message_file: Path | None = None, 93 ) -> None: 94 if messages is None and message_file is None: 95 raise ValueError("Either messages or message_file must be provided") 96 if messages is not None and message_file is not None: 97 raise ValueError("Only one of messages or message_file can be provided") 98 99 self._command = command 100 self._messages: list[AirbyteMessage] | None = None 101 self._message_file: Path | None = message_file 102 if messages: 103 try: 104 self._messages = [self._parse_message(message) for message in messages] 105 except V2ValidationError as exception: 106 raise ValueError("All messages are expected to be AirbyteMessage") from exception 107 108 if uncaught_exception: 109 if self._messages is None: 110 self._messages = [] 111 112 self._messages.append( 113 assemble_uncaught_exception( 114 type(uncaught_exception), uncaught_exception 115 ).as_airbyte_message() 116 )
137 def records_and_state_messages_iterator( 138 self, 139 ) -> Generator[AirbyteMessage, None, None]: 140 """Returns a generator that yields record and state messages one by one. 141 142 Use this instead of `records_and_state_messages` when the volume of messages could be large 143 enough to overload available memory. 144 """ 145 return self.get_message_by_types( 146 message_types=[Type.RECORD, Type.STATE], 147 safe_iterator=True, 148 )
Returns a generator that yields record and state messages one by one.
Use this instead of records_and_state_messages
when the volume of messages could be large
enough to overload available memory.
154 @property 155 def records_iterator(self) -> Generator[AirbyteMessage, None, None]: 156 """Returns a generator that yields record messages one by one. 157 158 Use this instead of `records` when the volume of records could be large 159 enough to overload available memory. 160 """ 161 return self.get_message_by_types([Type.RECORD], safe_iterator=True)
Returns a generator that yields record messages one by one.
Use this instead of records
when the volume of records could be large
enough to overload available memory.
175 @property 176 def most_recent_state(self) -> AirbyteStreamState | None: 177 state_message_iterator = self.get_message_by_types( 178 [Type.STATE], 179 safe_iterator=True, 180 ) 181 # Use a deque with maxlen=1 to efficiently get the last state message 182 double_ended_queue = deque(state_message_iterator, maxlen=1) 183 try: 184 final_state_message: AirbyteMessage = double_ended_queue.pop() 185 except IndexError: 186 raise ValueError( 187 "Can't provide most recent state as there are no state messages." 188 ) from None 189 190 return final_state_message.state.stream # type: ignore[union-attr] # state has `stream`
208 def get_formatted_error_message(self) -> str: 209 """Returns a human-readable error message with the contents. 210 211 If there are no errors, returns an empty string. 212 """ 213 errors = self.errors 214 if not errors: 215 # If there are no errors, return an empty string. 216 return "" 217 218 result = "Failed to run airbyte command" 219 result += ": " + " ".join(self._command) if self._command else "." 220 result += "\n" + "\n".join( 221 [str(error.trace.error).replace("\\n", "\n") for error in errors if error.trace], 222 ) 223 return result
Returns a human-readable error message with the contents.
If there are no errors, returns an empty string.
225 def as_exception(self) -> AirbyteEntrypointException: 226 """Convert the output to an exception.""" 227 return AirbyteEntrypointException(self.get_formatted_error_message())
Convert the output to an exception.
229 def raise_if_errors( 230 self, 231 ) -> None: 232 """Raise an exception if there are errors in the output. 233 234 Otherwise, do nothing. 235 """ 236 if not self.errors: 237 return None 238 239 raise self.as_exception()
Raise an exception if there are errors in the output.
Otherwise, do nothing.
248 def get_stream_statuses(self, stream_name: str) -> List[AirbyteStreamStatus]: 249 status_messages = map( 250 lambda message: message.trace.stream_status.status, # type: ignore 251 filter( 252 lambda message: message.trace.stream_status.stream_descriptor.name == stream_name, # type: ignore # callable; trace has `stream_status` 253 self._get_trace_message_by_trace_type(TraceType.STREAM_STATUS), 254 ), 255 ) 256 return list(status_messages)
258 def get_message_iterator(self) -> Generator[AirbyteMessage, None, None]: 259 """Creates a generator which yields messages one by one. 260 261 This will iterate over all messages in the output file (if provided) or the messages 262 provided during initialization. File results are provided first, followed by any 263 messages that were passed in directly. 264 """ 265 if self._message_file: 266 try: 267 with open(self._message_file, "r", encoding="utf-8") as file: 268 for line in file: 269 if not line.strip(): 270 # Skip empty lines 271 continue 272 273 yield self._parse_message(line.strip()) 274 except FileNotFoundError: 275 raise ValueError(f"Message file {self._message_file} not found") 276 277 if self._messages is not None: 278 yield from self._messages
Creates a generator which yields messages one by one.
This will iterate over all messages in the output file (if provided) or the messages provided during initialization. File results are provided first, followed by any messages that were passed in directly.
304 def get_message_by_types( 305 self, 306 message_types: list[Type], 307 *, 308 safe_iterator: bool = False, 309 ) -> list[AirbyteMessage] | Generator[AirbyteMessage, None, None]: 310 """Get messages of specific types. 311 312 If `safe_iterator` is True, returns a generator that yields messages one by one. 313 If `safe_iterator` is False, returns a list of messages. 314 315 Use `safe_iterator=True` when the volume of messages could overload the available 316 memory. 317 """ 318 message_generator = self.get_message_iterator() 319 320 if safe_iterator: 321 return (message for message in message_generator if message.type in message_types) 322 323 return [message for message in message_generator if message.type in message_types]
Get messages of specific types.
If safe_iterator
is True, returns a generator that yields messages one by one.
If safe_iterator
is False, returns a list of messages.
Use safe_iterator=True
when the volume of messages could overload the available
memory.
335 def is_in_logs(self, pattern: str) -> bool: 336 """Check if any log message case-insensitive matches the pattern.""" 337 return any( 338 re.search( 339 pattern, 340 entry.log.message, # type: ignore[union-attr] # log has `message` 341 flags=re.IGNORECASE, 342 ) 343 for entry in self.logs 344 )
Check if any log message case-insensitive matches the pattern.
399def discover( 400 source: Source, 401 config: Mapping[str, Any], 402 expecting_exception: bool | None = None, # Deprecated, use `expected_outcome` instead. 403 *, 404 expected_outcome: ExpectedOutcome | None = None, 405) -> EntrypointOutput: 406 """ 407 config must be json serializable 408 :param expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please 409 provide `expected_outcome=ExpectedOutcome.EXPECT_FAILURE` so that the test output logs are cleaner 410 """ 411 412 with tempfile.TemporaryDirectory() as tmp_directory: 413 tmp_directory_path = Path(tmp_directory) 414 config_file = make_file(tmp_directory_path / "config.json", config) 415 416 return _run_command( 417 source, 418 ["discover", "--config", config_file, "--debug"], 419 expecting_exception=expecting_exception, # Deprecated, but still supported. 420 expected_outcome=expected_outcome, 421 )
config must be json serializable
Parameters
- expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please
provide
expected_outcome=ExpectedOutcome.EXPECT_FAILURE
so that the test output logs are cleaner
424def read( 425 source: Source, 426 config: Mapping[str, Any], 427 catalog: ConfiguredAirbyteCatalog, 428 state: Optional[List[AirbyteStateMessage]] = None, 429 expecting_exception: bool | None = None, # Deprecated, use `expected_outcome` instead. 430 *, 431 expected_outcome: ExpectedOutcome | None = None, 432 debug: bool = False, 433) -> EntrypointOutput: 434 """ 435 config and state must be json serializable 436 437 :param expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please 438 provide `expected_outcome=ExpectedOutcome.EXPECT_FAILURE` so that the test output logs are cleaner. 439 """ 440 with tempfile.TemporaryDirectory() as tmp_directory: 441 tmp_directory_path = Path(tmp_directory) 442 config_file = make_file(tmp_directory_path / "config.json", config) 443 catalog_file = make_file( 444 tmp_directory_path / "catalog.json", 445 orjson.dumps(ConfiguredAirbyteCatalogSerializer.dump(catalog)).decode(), 446 ) 447 args = [ 448 "read", 449 "--config", 450 config_file, 451 "--catalog", 452 catalog_file, 453 ] 454 if debug: 455 args.append("--debug") 456 if state is not None: 457 args.extend( 458 [ 459 "--state", 460 make_file( 461 tmp_directory_path / "state.json", 462 f"[{','.join([orjson.dumps(AirbyteStateMessageSerializer.dump(stream_state)).decode() for stream_state in state])}]", 463 ), 464 ] 465 ) 466 467 return _run_command( 468 source, 469 args, 470 expecting_exception=expecting_exception, # Deprecated, but still supported. 471 expected_outcome=expected_outcome, 472 )
config and state must be json serializable
Parameters
- expected_outcome: By default if there is an uncaught exception, the exception will be printed out. If this is expected, please
provide
expected_outcome=ExpectedOutcome.EXPECT_FAILURE
so that the test output logs are cleaner.