airbyte_cdk.utils.traced_exception

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import time
  5import traceback
  6from typing import Any, Optional
  7
  8import orjson
  9
 10from airbyte_cdk.models import (
 11    AirbyteConnectionStatus,
 12    AirbyteErrorTraceMessage,
 13    AirbyteMessage,
 14    AirbyteMessageSerializer,
 15    AirbyteTraceMessage,
 16    FailureType,
 17    Status,
 18    StreamDescriptor,
 19    TraceType,
 20)
 21from airbyte_cdk.models import Type as MessageType
 22from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
 23
 24
 25class AirbyteTracedException(Exception):
 26    """
 27    An exception that should be emitted as an AirbyteTraceMessage
 28    """
 29
 30    def __init__(
 31        self,
 32        internal_message: Optional[str] = None,
 33        message: Optional[str] = None,
 34        failure_type: FailureType = FailureType.system_error,
 35        exception: Optional[BaseException] = None,
 36        stream_descriptor: Optional[StreamDescriptor] = None,
 37    ):
 38        """
 39        :param internal_message: the internal error that caused the failure
 40        :param message: a user-friendly message that indicates the cause of the error
 41        :param failure_type: the type of error
 42        :param exception: the exception that caused the error, from which the stack trace should be retrieved
 43        :param stream_descriptor: describe the stream from which the exception comes from
 44        """
 45        self.internal_message = internal_message
 46        self.message = message
 47        self.failure_type = failure_type
 48        self._exception = exception
 49        self._stream_descriptor = stream_descriptor
 50        super().__init__(internal_message)
 51
 52    def __str__(self) -> str:
 53        """Return the user-facing message, falling back to internal_message."""
 54        if self.message is not None:
 55            return self.message
 56        if self.internal_message is not None:
 57            return self.internal_message
 58        return ""
 59
 60    def as_airbyte_message(
 61        self, stream_descriptor: Optional[StreamDescriptor] = None
 62    ) -> AirbyteMessage:
 63        """
 64        Builds an AirbyteTraceMessage from the exception
 65
 66        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
 67          stream_descriptors are defined, the one from `as_airbyte_message` will be discarded.
 68        """
 69        now_millis = time.time_ns() // 1_000_000
 70
 71        trace_exc = self._exception or self
 72        stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format())
 73
 74        trace_message = AirbyteTraceMessage(
 75            type=TraceType.ERROR,
 76            emitted_at=now_millis,
 77            error=AirbyteErrorTraceMessage(
 78                message=self.message
 79                or "Something went wrong in the connector. See the logs for more details.",
 80                internal_message=self.internal_message,
 81                failure_type=self.failure_type,
 82                stack_trace=stack_trace_str,
 83                stream_descriptor=self._stream_descriptor
 84                if self._stream_descriptor is not None
 85                else stream_descriptor,
 86            ),
 87        )
 88
 89        return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
 90
 91    def as_connection_status_message(self) -> Optional[AirbyteMessage]:
 92        if self.failure_type == FailureType.config_error:
 93            return AirbyteMessage(
 94                type=MessageType.CONNECTION_STATUS,
 95                connectionStatus=AirbyteConnectionStatus(
 96                    status=Status.FAILED, message=self.message
 97                ),
 98            )
 99        return None
100
101    def emit_message(self) -> None:
102        """
103        Prints the exception as an AirbyteTraceMessage.
104        Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.
105        """
106        message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode()
107        filtered_message = filter_secrets(message)
108        print(filtered_message)
109
110    @classmethod
111    def from_exception(
112        cls,
113        exc: BaseException,
114        stream_descriptor: Optional[StreamDescriptor] = None,
115        *args: Any,
116        **kwargs: Any,
117    ) -> "AirbyteTracedException":
118        """
119        Helper to create an AirbyteTracedException from an existing exception
120        :param exc: the exception that caused the error
121        :param stream_descriptor: describe the stream from which the exception comes from
122        """
123        if isinstance(exc, AirbyteTracedException):
124            internal_message = exc.internal_message
125            # Preserve the original user-facing message if the caller didn't provide one
126            if "message" not in kwargs:
127                kwargs["message"] = exc.message
128        else:
129            internal_message = str(exc)
130        return cls(
131            internal_message=internal_message,
132            exception=exc,
133            stream_descriptor=stream_descriptor,
134            *args,
135            **kwargs,
136        )  # type: ignore  # ignoring because of args and kwargs
137
138    def as_sanitized_airbyte_message(
139        self, stream_descriptor: Optional[StreamDescriptor] = None
140    ) -> AirbyteMessage:
141        """
142        Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
143
144        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
145          stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded.
146        """
147        error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
148        if error_message.trace.error.message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
149            error_message.trace.error.message = filter_secrets(  # type: ignore[union-attr]
150                error_message.trace.error.message,  # type: ignore[union-attr]
151            )
152        if error_message.trace.error.internal_message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
153            error_message.trace.error.internal_message = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
154                error_message.trace.error.internal_message  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
155            )
156        if error_message.trace.error.stack_trace:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
157            error_message.trace.error.stack_trace = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
158                error_message.trace.error.stack_trace  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
159            )
160        return error_message
class AirbyteTracedException(builtins.Exception):
 26class AirbyteTracedException(Exception):
 27    """
 28    An exception that should be emitted as an AirbyteTraceMessage
 29    """
 30
 31    def __init__(
 32        self,
 33        internal_message: Optional[str] = None,
 34        message: Optional[str] = None,
 35        failure_type: FailureType = FailureType.system_error,
 36        exception: Optional[BaseException] = None,
 37        stream_descriptor: Optional[StreamDescriptor] = None,
 38    ):
 39        """
 40        :param internal_message: the internal error that caused the failure
 41        :param message: a user-friendly message that indicates the cause of the error
 42        :param failure_type: the type of error
 43        :param exception: the exception that caused the error, from which the stack trace should be retrieved
 44        :param stream_descriptor: describe the stream from which the exception comes from
 45        """
 46        self.internal_message = internal_message
 47        self.message = message
 48        self.failure_type = failure_type
 49        self._exception = exception
 50        self._stream_descriptor = stream_descriptor
 51        super().__init__(internal_message)
 52
 53    def __str__(self) -> str:
 54        """Return the user-facing message, falling back to internal_message."""
 55        if self.message is not None:
 56            return self.message
 57        if self.internal_message is not None:
 58            return self.internal_message
 59        return ""
 60
 61    def as_airbyte_message(
 62        self, stream_descriptor: Optional[StreamDescriptor] = None
 63    ) -> AirbyteMessage:
 64        """
 65        Builds an AirbyteTraceMessage from the exception
 66
 67        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
 68          stream_descriptors are defined, the one from `as_airbyte_message` will be discarded.
 69        """
 70        now_millis = time.time_ns() // 1_000_000
 71
 72        trace_exc = self._exception or self
 73        stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format())
 74
 75        trace_message = AirbyteTraceMessage(
 76            type=TraceType.ERROR,
 77            emitted_at=now_millis,
 78            error=AirbyteErrorTraceMessage(
 79                message=self.message
 80                or "Something went wrong in the connector. See the logs for more details.",
 81                internal_message=self.internal_message,
 82                failure_type=self.failure_type,
 83                stack_trace=stack_trace_str,
 84                stream_descriptor=self._stream_descriptor
 85                if self._stream_descriptor is not None
 86                else stream_descriptor,
 87            ),
 88        )
 89
 90        return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
 91
 92    def as_connection_status_message(self) -> Optional[AirbyteMessage]:
 93        if self.failure_type == FailureType.config_error:
 94            return AirbyteMessage(
 95                type=MessageType.CONNECTION_STATUS,
 96                connectionStatus=AirbyteConnectionStatus(
 97                    status=Status.FAILED, message=self.message
 98                ),
 99            )
100        return None
101
102    def emit_message(self) -> None:
103        """
104        Prints the exception as an AirbyteTraceMessage.
105        Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.
106        """
107        message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode()
108        filtered_message = filter_secrets(message)
109        print(filtered_message)
110
111    @classmethod
112    def from_exception(
113        cls,
114        exc: BaseException,
115        stream_descriptor: Optional[StreamDescriptor] = None,
116        *args: Any,
117        **kwargs: Any,
118    ) -> "AirbyteTracedException":
119        """
120        Helper to create an AirbyteTracedException from an existing exception
121        :param exc: the exception that caused the error
122        :param stream_descriptor: describe the stream from which the exception comes from
123        """
124        if isinstance(exc, AirbyteTracedException):
125            internal_message = exc.internal_message
126            # Preserve the original user-facing message if the caller didn't provide one
127            if "message" not in kwargs:
128                kwargs["message"] = exc.message
129        else:
130            internal_message = str(exc)
131        return cls(
132            internal_message=internal_message,
133            exception=exc,
134            stream_descriptor=stream_descriptor,
135            *args,
136            **kwargs,
137        )  # type: ignore  # ignoring because of args and kwargs
138
139    def as_sanitized_airbyte_message(
140        self, stream_descriptor: Optional[StreamDescriptor] = None
141    ) -> AirbyteMessage:
142        """
143        Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
144
145        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
146          stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded.
147        """
148        error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
149        if error_message.trace.error.message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
150            error_message.trace.error.message = filter_secrets(  # type: ignore[union-attr]
151                error_message.trace.error.message,  # type: ignore[union-attr]
152            )
153        if error_message.trace.error.internal_message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
154            error_message.trace.error.internal_message = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
155                error_message.trace.error.internal_message  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
156            )
157        if error_message.trace.error.stack_trace:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
158            error_message.trace.error.stack_trace = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
159                error_message.trace.error.stack_trace  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
160            )
161        return error_message

An exception that should be emitted as an AirbyteTraceMessage

AirbyteTracedException( internal_message: Optional[str] = None, message: Optional[str] = None, failure_type: airbyte_protocol_dataclasses.models.airbyte_protocol.FailureType = <FailureType.system_error: 'system_error'>, exception: Optional[BaseException] = None, stream_descriptor: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.StreamDescriptor] = None)
31    def __init__(
32        self,
33        internal_message: Optional[str] = None,
34        message: Optional[str] = None,
35        failure_type: FailureType = FailureType.system_error,
36        exception: Optional[BaseException] = None,
37        stream_descriptor: Optional[StreamDescriptor] = None,
38    ):
39        """
40        :param internal_message: the internal error that caused the failure
41        :param message: a user-friendly message that indicates the cause of the error
42        :param failure_type: the type of error
43        :param exception: the exception that caused the error, from which the stack trace should be retrieved
44        :param stream_descriptor: describe the stream from which the exception comes from
45        """
46        self.internal_message = internal_message
47        self.message = message
48        self.failure_type = failure_type
49        self._exception = exception
50        self._stream_descriptor = stream_descriptor
51        super().__init__(internal_message)
Parameters
  • internal_message: the internal error that caused the failure
  • message: a user-friendly message that indicates the cause of the error
  • failure_type: the type of error
  • exception: the exception that caused the error, from which the stack trace should be retrieved
  • stream_descriptor: describe the stream from which the exception comes from
internal_message
message
failure_type
def as_airbyte_message( self, stream_descriptor: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.StreamDescriptor] = None) -> airbyte_cdk.AirbyteMessage:
61    def as_airbyte_message(
62        self, stream_descriptor: Optional[StreamDescriptor] = None
63    ) -> AirbyteMessage:
64        """
65        Builds an AirbyteTraceMessage from the exception
66
67        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
68          stream_descriptors are defined, the one from `as_airbyte_message` will be discarded.
69        """
70        now_millis = time.time_ns() // 1_000_000
71
72        trace_exc = self._exception or self
73        stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format())
74
75        trace_message = AirbyteTraceMessage(
76            type=TraceType.ERROR,
77            emitted_at=now_millis,
78            error=AirbyteErrorTraceMessage(
79                message=self.message
80                or "Something went wrong in the connector. See the logs for more details.",
81                internal_message=self.internal_message,
82                failure_type=self.failure_type,
83                stack_trace=stack_trace_str,
84                stream_descriptor=self._stream_descriptor
85                if self._stream_descriptor is not None
86                else stream_descriptor,
87            ),
88        )
89
90        return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)

Builds an AirbyteTraceMessage from the exception

:param stream_descriptor is deprecated, please use the stream_description in __init__ orfrom_exception. If many stream_descriptors are defined, the one fromas_airbyte_message` will be discarded.

def as_connection_status_message(self) -> Optional[airbyte_cdk.AirbyteMessage]:
 92    def as_connection_status_message(self) -> Optional[AirbyteMessage]:
 93        if self.failure_type == FailureType.config_error:
 94            return AirbyteMessage(
 95                type=MessageType.CONNECTION_STATUS,
 96                connectionStatus=AirbyteConnectionStatus(
 97                    status=Status.FAILED, message=self.message
 98                ),
 99            )
100        return None
def emit_message(self) -> None:
102    def emit_message(self) -> None:
103        """
104        Prints the exception as an AirbyteTraceMessage.
105        Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.
106        """
107        message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode()
108        filtered_message = filter_secrets(message)
109        print(filtered_message)

Prints the exception as an AirbyteTraceMessage. Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.

@classmethod
def from_exception( cls, exc: BaseException, stream_descriptor: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.StreamDescriptor] = None, *args: Any, **kwargs: Any) -> AirbyteTracedException:
111    @classmethod
112    def from_exception(
113        cls,
114        exc: BaseException,
115        stream_descriptor: Optional[StreamDescriptor] = None,
116        *args: Any,
117        **kwargs: Any,
118    ) -> "AirbyteTracedException":
119        """
120        Helper to create an AirbyteTracedException from an existing exception
121        :param exc: the exception that caused the error
122        :param stream_descriptor: describe the stream from which the exception comes from
123        """
124        if isinstance(exc, AirbyteTracedException):
125            internal_message = exc.internal_message
126            # Preserve the original user-facing message if the caller didn't provide one
127            if "message" not in kwargs:
128                kwargs["message"] = exc.message
129        else:
130            internal_message = str(exc)
131        return cls(
132            internal_message=internal_message,
133            exception=exc,
134            stream_descriptor=stream_descriptor,
135            *args,
136            **kwargs,
137        )  # type: ignore  # ignoring because of args and kwargs

Helper to create an AirbyteTracedException from an existing exception

Parameters
  • exc: the exception that caused the error
  • stream_descriptor: describe the stream from which the exception comes from
def as_sanitized_airbyte_message( self, stream_descriptor: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.StreamDescriptor] = None) -> airbyte_cdk.AirbyteMessage:
139    def as_sanitized_airbyte_message(
140        self, stream_descriptor: Optional[StreamDescriptor] = None
141    ) -> AirbyteMessage:
142        """
143        Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
144
145        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
146          stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded.
147        """
148        error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
149        if error_message.trace.error.message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
150            error_message.trace.error.message = filter_secrets(  # type: ignore[union-attr]
151                error_message.trace.error.message,  # type: ignore[union-attr]
152            )
153        if error_message.trace.error.internal_message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
154            error_message.trace.error.internal_message = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
155                error_message.trace.error.internal_message  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
156            )
157        if error_message.trace.error.stack_trace:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
158            error_message.trace.error.stack_trace = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
159                error_message.trace.error.stack_trace  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
160            )
161        return error_message

Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body

:param stream_descriptor is deprecated, please use the stream_description in __init__ orfrom_exception. If many stream_descriptors are defined, the one fromas_sanitized_airbyte_message` will be discarded.