airbyte_cdk.utils.traced_exception
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import time 5import traceback 6from typing import Any, Optional 7 8import orjson 9 10from airbyte_cdk.models import ( 11 AirbyteConnectionStatus, 12 AirbyteErrorTraceMessage, 13 AirbyteMessage, 14 AirbyteMessageSerializer, 15 AirbyteTraceMessage, 16 FailureType, 17 Status, 18 StreamDescriptor, 19 TraceType, 20) 21from airbyte_cdk.models import Type as MessageType 22from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets 23 24 25class AirbyteTracedException(Exception): 26 """ 27 An exception that should be emitted as an AirbyteTraceMessage 28 """ 29 30 def __init__( 31 self, 32 internal_message: Optional[str] = None, 33 message: Optional[str] = None, 34 failure_type: FailureType = FailureType.system_error, 35 exception: Optional[BaseException] = None, 36 stream_descriptor: Optional[StreamDescriptor] = None, 37 ): 38 """ 39 :param internal_message: the internal error that caused the failure 40 :param message: a user-friendly message that indicates the cause of the error 41 :param failure_type: the type of error 42 :param exception: the exception that caused the error, from which the stack trace should be retrieved 43 :param stream_descriptor: describe the stream from which the exception comes from 44 """ 45 self.internal_message = internal_message 46 self.message = message 47 self.failure_type = failure_type 48 self._exception = exception 49 self._stream_descriptor = stream_descriptor 50 super().__init__(internal_message) 51 52 def as_airbyte_message( 53 self, stream_descriptor: Optional[StreamDescriptor] = None 54 ) -> AirbyteMessage: 55 """ 56 Builds an AirbyteTraceMessage from the exception 57 58 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 59 stream_descriptors are defined, the one from `as_airbyte_message` will be discarded. 60 """ 61 now_millis = time.time_ns() // 1_000_000 62 63 trace_exc = self._exception or self 64 stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format()) 65 66 trace_message = AirbyteTraceMessage( 67 type=TraceType.ERROR, 68 emitted_at=now_millis, 69 error=AirbyteErrorTraceMessage( 70 message=self.message 71 or "Something went wrong in the connector. See the logs for more details.", 72 internal_message=self.internal_message, 73 failure_type=self.failure_type, 74 stack_trace=stack_trace_str, 75 stream_descriptor=self._stream_descriptor 76 if self._stream_descriptor is not None 77 else stream_descriptor, 78 ), 79 ) 80 81 return AirbyteMessage(type=MessageType.TRACE, trace=trace_message) 82 83 def as_connection_status_message(self) -> Optional[AirbyteMessage]: 84 if self.failure_type == FailureType.config_error: 85 return AirbyteMessage( 86 type=MessageType.CONNECTION_STATUS, 87 connectionStatus=AirbyteConnectionStatus( 88 status=Status.FAILED, message=self.message 89 ), 90 ) 91 return None 92 93 def emit_message(self) -> None: 94 """ 95 Prints the exception as an AirbyteTraceMessage. 96 Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint. 97 """ 98 message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode() 99 filtered_message = filter_secrets(message) 100 print(filtered_message) 101 102 @classmethod 103 def from_exception( 104 cls, 105 exc: BaseException, 106 stream_descriptor: Optional[StreamDescriptor] = None, 107 *args: Any, 108 **kwargs: Any, 109 ) -> "AirbyteTracedException": 110 """ 111 Helper to create an AirbyteTracedException from an existing exception 112 :param exc: the exception that caused the error 113 :param stream_descriptor: describe the stream from which the exception comes from 114 """ 115 return cls( 116 internal_message=str(exc), 117 exception=exc, 118 stream_descriptor=stream_descriptor, 119 *args, 120 **kwargs, 121 ) # type: ignore # ignoring because of args and kwargs 122 123 def as_sanitized_airbyte_message( 124 self, stream_descriptor: Optional[StreamDescriptor] = None 125 ) -> AirbyteMessage: 126 """ 127 Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body 128 129 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 130 stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded. 131 """ 132 error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor) 133 if error_message.trace.error.message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 134 error_message.trace.error.message = filter_secrets( # type: ignore[union-attr] 135 error_message.trace.error.message, # type: ignore[union-attr] 136 ) 137 if error_message.trace.error.internal_message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 138 error_message.trace.error.internal_message = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 139 error_message.trace.error.internal_message # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 140 ) 141 if error_message.trace.error.stack_trace: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 142 error_message.trace.error.stack_trace = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 143 error_message.trace.error.stack_trace # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 144 ) 145 return error_message
26class AirbyteTracedException(Exception): 27 """ 28 An exception that should be emitted as an AirbyteTraceMessage 29 """ 30 31 def __init__( 32 self, 33 internal_message: Optional[str] = None, 34 message: Optional[str] = None, 35 failure_type: FailureType = FailureType.system_error, 36 exception: Optional[BaseException] = None, 37 stream_descriptor: Optional[StreamDescriptor] = None, 38 ): 39 """ 40 :param internal_message: the internal error that caused the failure 41 :param message: a user-friendly message that indicates the cause of the error 42 :param failure_type: the type of error 43 :param exception: the exception that caused the error, from which the stack trace should be retrieved 44 :param stream_descriptor: describe the stream from which the exception comes from 45 """ 46 self.internal_message = internal_message 47 self.message = message 48 self.failure_type = failure_type 49 self._exception = exception 50 self._stream_descriptor = stream_descriptor 51 super().__init__(internal_message) 52 53 def as_airbyte_message( 54 self, stream_descriptor: Optional[StreamDescriptor] = None 55 ) -> AirbyteMessage: 56 """ 57 Builds an AirbyteTraceMessage from the exception 58 59 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 60 stream_descriptors are defined, the one from `as_airbyte_message` will be discarded. 61 """ 62 now_millis = time.time_ns() // 1_000_000 63 64 trace_exc = self._exception or self 65 stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format()) 66 67 trace_message = AirbyteTraceMessage( 68 type=TraceType.ERROR, 69 emitted_at=now_millis, 70 error=AirbyteErrorTraceMessage( 71 message=self.message 72 or "Something went wrong in the connector. See the logs for more details.", 73 internal_message=self.internal_message, 74 failure_type=self.failure_type, 75 stack_trace=stack_trace_str, 76 stream_descriptor=self._stream_descriptor 77 if self._stream_descriptor is not None 78 else stream_descriptor, 79 ), 80 ) 81 82 return AirbyteMessage(type=MessageType.TRACE, trace=trace_message) 83 84 def as_connection_status_message(self) -> Optional[AirbyteMessage]: 85 if self.failure_type == FailureType.config_error: 86 return AirbyteMessage( 87 type=MessageType.CONNECTION_STATUS, 88 connectionStatus=AirbyteConnectionStatus( 89 status=Status.FAILED, message=self.message 90 ), 91 ) 92 return None 93 94 def emit_message(self) -> None: 95 """ 96 Prints the exception as an AirbyteTraceMessage. 97 Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint. 98 """ 99 message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode() 100 filtered_message = filter_secrets(message) 101 print(filtered_message) 102 103 @classmethod 104 def from_exception( 105 cls, 106 exc: BaseException, 107 stream_descriptor: Optional[StreamDescriptor] = None, 108 *args: Any, 109 **kwargs: Any, 110 ) -> "AirbyteTracedException": 111 """ 112 Helper to create an AirbyteTracedException from an existing exception 113 :param exc: the exception that caused the error 114 :param stream_descriptor: describe the stream from which the exception comes from 115 """ 116 return cls( 117 internal_message=str(exc), 118 exception=exc, 119 stream_descriptor=stream_descriptor, 120 *args, 121 **kwargs, 122 ) # type: ignore # ignoring because of args and kwargs 123 124 def as_sanitized_airbyte_message( 125 self, stream_descriptor: Optional[StreamDescriptor] = None 126 ) -> AirbyteMessage: 127 """ 128 Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body 129 130 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 131 stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded. 132 """ 133 error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor) 134 if error_message.trace.error.message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 135 error_message.trace.error.message = filter_secrets( # type: ignore[union-attr] 136 error_message.trace.error.message, # type: ignore[union-attr] 137 ) 138 if error_message.trace.error.internal_message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 139 error_message.trace.error.internal_message = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 140 error_message.trace.error.internal_message # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 141 ) 142 if error_message.trace.error.stack_trace: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 143 error_message.trace.error.stack_trace = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 144 error_message.trace.error.stack_trace # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 145 ) 146 return error_message
An exception that should be emitted as an AirbyteTraceMessage
31 def __init__( 32 self, 33 internal_message: Optional[str] = None, 34 message: Optional[str] = None, 35 failure_type: FailureType = FailureType.system_error, 36 exception: Optional[BaseException] = None, 37 stream_descriptor: Optional[StreamDescriptor] = None, 38 ): 39 """ 40 :param internal_message: the internal error that caused the failure 41 :param message: a user-friendly message that indicates the cause of the error 42 :param failure_type: the type of error 43 :param exception: the exception that caused the error, from which the stack trace should be retrieved 44 :param stream_descriptor: describe the stream from which the exception comes from 45 """ 46 self.internal_message = internal_message 47 self.message = message 48 self.failure_type = failure_type 49 self._exception = exception 50 self._stream_descriptor = stream_descriptor 51 super().__init__(internal_message)
Parameters
- internal_message: the internal error that caused the failure
- message: a user-friendly message that indicates the cause of the error
- failure_type: the type of error
- exception: the exception that caused the error, from which the stack trace should be retrieved
- stream_descriptor: describe the stream from which the exception comes from
53 def as_airbyte_message( 54 self, stream_descriptor: Optional[StreamDescriptor] = None 55 ) -> AirbyteMessage: 56 """ 57 Builds an AirbyteTraceMessage from the exception 58 59 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 60 stream_descriptors are defined, the one from `as_airbyte_message` will be discarded. 61 """ 62 now_millis = time.time_ns() // 1_000_000 63 64 trace_exc = self._exception or self 65 stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format()) 66 67 trace_message = AirbyteTraceMessage( 68 type=TraceType.ERROR, 69 emitted_at=now_millis, 70 error=AirbyteErrorTraceMessage( 71 message=self.message 72 or "Something went wrong in the connector. See the logs for more details.", 73 internal_message=self.internal_message, 74 failure_type=self.failure_type, 75 stack_trace=stack_trace_str, 76 stream_descriptor=self._stream_descriptor 77 if self._stream_descriptor is not None 78 else stream_descriptor, 79 ), 80 ) 81 82 return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
Builds an AirbyteTraceMessage from the exception
:param stream_descriptor is deprecated, please use the stream_description in __init__ or
from_exception. If many
stream_descriptors are defined, the one from
as_airbyte_message` will be discarded.
84 def as_connection_status_message(self) -> Optional[AirbyteMessage]: 85 if self.failure_type == FailureType.config_error: 86 return AirbyteMessage( 87 type=MessageType.CONNECTION_STATUS, 88 connectionStatus=AirbyteConnectionStatus( 89 status=Status.FAILED, message=self.message 90 ), 91 ) 92 return None
94 def emit_message(self) -> None: 95 """ 96 Prints the exception as an AirbyteTraceMessage. 97 Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint. 98 """ 99 message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode() 100 filtered_message = filter_secrets(message) 101 print(filtered_message)
Prints the exception as an AirbyteTraceMessage. Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.
103 @classmethod 104 def from_exception( 105 cls, 106 exc: BaseException, 107 stream_descriptor: Optional[StreamDescriptor] = None, 108 *args: Any, 109 **kwargs: Any, 110 ) -> "AirbyteTracedException": 111 """ 112 Helper to create an AirbyteTracedException from an existing exception 113 :param exc: the exception that caused the error 114 :param stream_descriptor: describe the stream from which the exception comes from 115 """ 116 return cls( 117 internal_message=str(exc), 118 exception=exc, 119 stream_descriptor=stream_descriptor, 120 *args, 121 **kwargs, 122 ) # type: ignore # ignoring because of args and kwargs
Helper to create an AirbyteTracedException from an existing exception
Parameters
- exc: the exception that caused the error
- stream_descriptor: describe the stream from which the exception comes from
124 def as_sanitized_airbyte_message( 125 self, stream_descriptor: Optional[StreamDescriptor] = None 126 ) -> AirbyteMessage: 127 """ 128 Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body 129 130 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 131 stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded. 132 """ 133 error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor) 134 if error_message.trace.error.message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 135 error_message.trace.error.message = filter_secrets( # type: ignore[union-attr] 136 error_message.trace.error.message, # type: ignore[union-attr] 137 ) 138 if error_message.trace.error.internal_message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 139 error_message.trace.error.internal_message = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 140 error_message.trace.error.internal_message # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 141 ) 142 if error_message.trace.error.stack_trace: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 143 error_message.trace.error.stack_trace = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 144 error_message.trace.error.stack_trace # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 145 ) 146 return error_message
Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
:param stream_descriptor is deprecated, please use the stream_description in __init__ or
from_exception. If many
stream_descriptors are defined, the one from
as_sanitized_airbyte_message` will be discarded.