airbyte_cdk.utils.traced_exception

  1#
  2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
  3#
  4import time
  5import traceback
  6from typing import Any, Optional
  7
  8import orjson
  9
 10from airbyte_cdk.models import (
 11    AirbyteConnectionStatus,
 12    AirbyteErrorTraceMessage,
 13    AirbyteMessage,
 14    AirbyteMessageSerializer,
 15    AirbyteTraceMessage,
 16    FailureType,
 17    Status,
 18    StreamDescriptor,
 19    TraceType,
 20)
 21from airbyte_cdk.models import Type as MessageType
 22from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
 23
 24
 25class AirbyteTracedException(Exception):
 26    """
 27    An exception that should be emitted as an AirbyteTraceMessage
 28    """
 29
 30    def __init__(
 31        self,
 32        internal_message: Optional[str] = None,
 33        message: Optional[str] = None,
 34        failure_type: FailureType = FailureType.system_error,
 35        exception: Optional[BaseException] = None,
 36        stream_descriptor: Optional[StreamDescriptor] = None,
 37    ):
 38        """
 39        :param internal_message: the internal error that caused the failure
 40        :param message: a user-friendly message that indicates the cause of the error
 41        :param failure_type: the type of error
 42        :param exception: the exception that caused the error, from which the stack trace should be retrieved
 43        :param stream_descriptor: describe the stream from which the exception comes from
 44        """
 45        self.internal_message = internal_message
 46        self.message = message
 47        self.failure_type = failure_type
 48        self._exception = exception
 49        self._stream_descriptor = stream_descriptor
 50        super().__init__(internal_message)
 51
 52    def as_airbyte_message(
 53        self, stream_descriptor: Optional[StreamDescriptor] = None
 54    ) -> AirbyteMessage:
 55        """
 56        Builds an AirbyteTraceMessage from the exception
 57
 58        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
 59          stream_descriptors are defined, the one from `as_airbyte_message` will be discarded.
 60        """
 61        now_millis = time.time_ns() // 1_000_000
 62
 63        trace_exc = self._exception or self
 64        stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format())
 65
 66        trace_message = AirbyteTraceMessage(
 67            type=TraceType.ERROR,
 68            emitted_at=now_millis,
 69            error=AirbyteErrorTraceMessage(
 70                message=self.message
 71                or "Something went wrong in the connector. See the logs for more details.",
 72                internal_message=self.internal_message,
 73                failure_type=self.failure_type,
 74                stack_trace=stack_trace_str,
 75                stream_descriptor=self._stream_descriptor
 76                if self._stream_descriptor is not None
 77                else stream_descriptor,
 78            ),
 79        )
 80
 81        return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
 82
 83    def as_connection_status_message(self) -> Optional[AirbyteMessage]:
 84        if self.failure_type == FailureType.config_error:
 85            return AirbyteMessage(
 86                type=MessageType.CONNECTION_STATUS,
 87                connectionStatus=AirbyteConnectionStatus(
 88                    status=Status.FAILED, message=self.message
 89                ),
 90            )
 91        return None
 92
 93    def emit_message(self) -> None:
 94        """
 95        Prints the exception as an AirbyteTraceMessage.
 96        Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.
 97        """
 98        message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode()
 99        filtered_message = filter_secrets(message)
100        print(filtered_message)
101
102    @classmethod
103    def from_exception(
104        cls,
105        exc: BaseException,
106        stream_descriptor: Optional[StreamDescriptor] = None,
107        *args: Any,
108        **kwargs: Any,
109    ) -> "AirbyteTracedException":
110        """
111        Helper to create an AirbyteTracedException from an existing exception
112        :param exc: the exception that caused the error
113        :param stream_descriptor: describe the stream from which the exception comes from
114        """
115        return cls(
116            internal_message=str(exc),
117            exception=exc,
118            stream_descriptor=stream_descriptor,
119            *args,
120            **kwargs,
121        )  # type: ignore  # ignoring because of args and kwargs
122
123    def as_sanitized_airbyte_message(
124        self, stream_descriptor: Optional[StreamDescriptor] = None
125    ) -> AirbyteMessage:
126        """
127        Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
128
129        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
130          stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded.
131        """
132        error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
133        if error_message.trace.error.message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
134            error_message.trace.error.message = filter_secrets(  # type: ignore[union-attr]
135                error_message.trace.error.message,  # type: ignore[union-attr]
136            )
137        if error_message.trace.error.internal_message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
138            error_message.trace.error.internal_message = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
139                error_message.trace.error.internal_message  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
140            )
141        if error_message.trace.error.stack_trace:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
142            error_message.trace.error.stack_trace = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
143                error_message.trace.error.stack_trace  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
144            )
145        return error_message
class AirbyteTracedException(builtins.Exception):
 26class AirbyteTracedException(Exception):
 27    """
 28    An exception that should be emitted as an AirbyteTraceMessage
 29    """
 30
 31    def __init__(
 32        self,
 33        internal_message: Optional[str] = None,
 34        message: Optional[str] = None,
 35        failure_type: FailureType = FailureType.system_error,
 36        exception: Optional[BaseException] = None,
 37        stream_descriptor: Optional[StreamDescriptor] = None,
 38    ):
 39        """
 40        :param internal_message: the internal error that caused the failure
 41        :param message: a user-friendly message that indicates the cause of the error
 42        :param failure_type: the type of error
 43        :param exception: the exception that caused the error, from which the stack trace should be retrieved
 44        :param stream_descriptor: describe the stream from which the exception comes from
 45        """
 46        self.internal_message = internal_message
 47        self.message = message
 48        self.failure_type = failure_type
 49        self._exception = exception
 50        self._stream_descriptor = stream_descriptor
 51        super().__init__(internal_message)
 52
 53    def as_airbyte_message(
 54        self, stream_descriptor: Optional[StreamDescriptor] = None
 55    ) -> AirbyteMessage:
 56        """
 57        Builds an AirbyteTraceMessage from the exception
 58
 59        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
 60          stream_descriptors are defined, the one from `as_airbyte_message` will be discarded.
 61        """
 62        now_millis = time.time_ns() // 1_000_000
 63
 64        trace_exc = self._exception or self
 65        stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format())
 66
 67        trace_message = AirbyteTraceMessage(
 68            type=TraceType.ERROR,
 69            emitted_at=now_millis,
 70            error=AirbyteErrorTraceMessage(
 71                message=self.message
 72                or "Something went wrong in the connector. See the logs for more details.",
 73                internal_message=self.internal_message,
 74                failure_type=self.failure_type,
 75                stack_trace=stack_trace_str,
 76                stream_descriptor=self._stream_descriptor
 77                if self._stream_descriptor is not None
 78                else stream_descriptor,
 79            ),
 80        )
 81
 82        return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
 83
 84    def as_connection_status_message(self) -> Optional[AirbyteMessage]:
 85        if self.failure_type == FailureType.config_error:
 86            return AirbyteMessage(
 87                type=MessageType.CONNECTION_STATUS,
 88                connectionStatus=AirbyteConnectionStatus(
 89                    status=Status.FAILED, message=self.message
 90                ),
 91            )
 92        return None
 93
 94    def emit_message(self) -> None:
 95        """
 96        Prints the exception as an AirbyteTraceMessage.
 97        Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.
 98        """
 99        message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode()
100        filtered_message = filter_secrets(message)
101        print(filtered_message)
102
103    @classmethod
104    def from_exception(
105        cls,
106        exc: BaseException,
107        stream_descriptor: Optional[StreamDescriptor] = None,
108        *args: Any,
109        **kwargs: Any,
110    ) -> "AirbyteTracedException":
111        """
112        Helper to create an AirbyteTracedException from an existing exception
113        :param exc: the exception that caused the error
114        :param stream_descriptor: describe the stream from which the exception comes from
115        """
116        return cls(
117            internal_message=str(exc),
118            exception=exc,
119            stream_descriptor=stream_descriptor,
120            *args,
121            **kwargs,
122        )  # type: ignore  # ignoring because of args and kwargs
123
124    def as_sanitized_airbyte_message(
125        self, stream_descriptor: Optional[StreamDescriptor] = None
126    ) -> AirbyteMessage:
127        """
128        Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
129
130        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
131          stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded.
132        """
133        error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
134        if error_message.trace.error.message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
135            error_message.trace.error.message = filter_secrets(  # type: ignore[union-attr]
136                error_message.trace.error.message,  # type: ignore[union-attr]
137            )
138        if error_message.trace.error.internal_message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
139            error_message.trace.error.internal_message = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
140                error_message.trace.error.internal_message  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
141            )
142        if error_message.trace.error.stack_trace:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
143            error_message.trace.error.stack_trace = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
144                error_message.trace.error.stack_trace  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
145            )
146        return error_message

An exception that should be emitted as an AirbyteTraceMessage

AirbyteTracedException( internal_message: Optional[str] = None, message: Optional[str] = None, failure_type: airbyte_protocol_dataclasses.models.airbyte_protocol.FailureType = <FailureType.system_error: 'system_error'>, exception: Optional[BaseException] = None, stream_descriptor: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.StreamDescriptor] = None)
31    def __init__(
32        self,
33        internal_message: Optional[str] = None,
34        message: Optional[str] = None,
35        failure_type: FailureType = FailureType.system_error,
36        exception: Optional[BaseException] = None,
37        stream_descriptor: Optional[StreamDescriptor] = None,
38    ):
39        """
40        :param internal_message: the internal error that caused the failure
41        :param message: a user-friendly message that indicates the cause of the error
42        :param failure_type: the type of error
43        :param exception: the exception that caused the error, from which the stack trace should be retrieved
44        :param stream_descriptor: describe the stream from which the exception comes from
45        """
46        self.internal_message = internal_message
47        self.message = message
48        self.failure_type = failure_type
49        self._exception = exception
50        self._stream_descriptor = stream_descriptor
51        super().__init__(internal_message)
Parameters
  • internal_message: the internal error that caused the failure
  • message: a user-friendly message that indicates the cause of the error
  • failure_type: the type of error
  • exception: the exception that caused the error, from which the stack trace should be retrieved
  • stream_descriptor: describe the stream from which the exception comes from
internal_message
message
failure_type
def as_airbyte_message( self, stream_descriptor: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.StreamDescriptor] = None) -> airbyte_cdk.AirbyteMessage:
53    def as_airbyte_message(
54        self, stream_descriptor: Optional[StreamDescriptor] = None
55    ) -> AirbyteMessage:
56        """
57        Builds an AirbyteTraceMessage from the exception
58
59        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
60          stream_descriptors are defined, the one from `as_airbyte_message` will be discarded.
61        """
62        now_millis = time.time_ns() // 1_000_000
63
64        trace_exc = self._exception or self
65        stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format())
66
67        trace_message = AirbyteTraceMessage(
68            type=TraceType.ERROR,
69            emitted_at=now_millis,
70            error=AirbyteErrorTraceMessage(
71                message=self.message
72                or "Something went wrong in the connector. See the logs for more details.",
73                internal_message=self.internal_message,
74                failure_type=self.failure_type,
75                stack_trace=stack_trace_str,
76                stream_descriptor=self._stream_descriptor
77                if self._stream_descriptor is not None
78                else stream_descriptor,
79            ),
80        )
81
82        return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)

Builds an AirbyteTraceMessage from the exception

:param stream_descriptor is deprecated, please use the stream_description in __init__ orfrom_exception. If many stream_descriptors are defined, the one fromas_airbyte_message` will be discarded.

def as_connection_status_message(self) -> Optional[airbyte_cdk.AirbyteMessage]:
84    def as_connection_status_message(self) -> Optional[AirbyteMessage]:
85        if self.failure_type == FailureType.config_error:
86            return AirbyteMessage(
87                type=MessageType.CONNECTION_STATUS,
88                connectionStatus=AirbyteConnectionStatus(
89                    status=Status.FAILED, message=self.message
90                ),
91            )
92        return None
def emit_message(self) -> None:
 94    def emit_message(self) -> None:
 95        """
 96        Prints the exception as an AirbyteTraceMessage.
 97        Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.
 98        """
 99        message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode()
100        filtered_message = filter_secrets(message)
101        print(filtered_message)

Prints the exception as an AirbyteTraceMessage. Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.

@classmethod
def from_exception( cls, exc: BaseException, stream_descriptor: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.StreamDescriptor] = None, *args: Any, **kwargs: Any) -> AirbyteTracedException:
103    @classmethod
104    def from_exception(
105        cls,
106        exc: BaseException,
107        stream_descriptor: Optional[StreamDescriptor] = None,
108        *args: Any,
109        **kwargs: Any,
110    ) -> "AirbyteTracedException":
111        """
112        Helper to create an AirbyteTracedException from an existing exception
113        :param exc: the exception that caused the error
114        :param stream_descriptor: describe the stream from which the exception comes from
115        """
116        return cls(
117            internal_message=str(exc),
118            exception=exc,
119            stream_descriptor=stream_descriptor,
120            *args,
121            **kwargs,
122        )  # type: ignore  # ignoring because of args and kwargs

Helper to create an AirbyteTracedException from an existing exception

Parameters
  • exc: the exception that caused the error
  • stream_descriptor: describe the stream from which the exception comes from
def as_sanitized_airbyte_message( self, stream_descriptor: Optional[airbyte_protocol_dataclasses.models.airbyte_protocol.StreamDescriptor] = None) -> airbyte_cdk.AirbyteMessage:
124    def as_sanitized_airbyte_message(
125        self, stream_descriptor: Optional[StreamDescriptor] = None
126    ) -> AirbyteMessage:
127        """
128        Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
129
130        :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many
131          stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded.
132        """
133        error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor)
134        if error_message.trace.error.message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
135            error_message.trace.error.message = filter_secrets(  # type: ignore[union-attr]
136                error_message.trace.error.message,  # type: ignore[union-attr]
137            )
138        if error_message.trace.error.internal_message:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
139            error_message.trace.error.internal_message = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
140                error_message.trace.error.internal_message  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
141            )
142        if error_message.trace.error.stack_trace:  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
143            error_message.trace.error.stack_trace = filter_secrets(  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
144                error_message.trace.error.stack_trace  # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage
145            )
146        return error_message

Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body

:param stream_descriptor is deprecated, please use the stream_description in __init__ orfrom_exception. If many stream_descriptors are defined, the one fromas_sanitized_airbyte_message` will be discarded.