airbyte_cdk.utils.traced_exception
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4import time 5import traceback 6from typing import Any, Optional 7 8import orjson 9 10from airbyte_cdk.models import ( 11 AirbyteConnectionStatus, 12 AirbyteErrorTraceMessage, 13 AirbyteMessage, 14 AirbyteMessageSerializer, 15 AirbyteTraceMessage, 16 FailureType, 17 Status, 18 StreamDescriptor, 19 TraceType, 20) 21from airbyte_cdk.models import Type as MessageType 22from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets 23 24 25class AirbyteTracedException(Exception): 26 """ 27 An exception that should be emitted as an AirbyteTraceMessage 28 """ 29 30 def __init__( 31 self, 32 internal_message: Optional[str] = None, 33 message: Optional[str] = None, 34 failure_type: FailureType = FailureType.system_error, 35 exception: Optional[BaseException] = None, 36 stream_descriptor: Optional[StreamDescriptor] = None, 37 ): 38 """ 39 :param internal_message: the internal error that caused the failure 40 :param message: a user-friendly message that indicates the cause of the error 41 :param failure_type: the type of error 42 :param exception: the exception that caused the error, from which the stack trace should be retrieved 43 :param stream_descriptor: describe the stream from which the exception comes from 44 """ 45 self.internal_message = internal_message 46 self.message = message 47 self.failure_type = failure_type 48 self._exception = exception 49 self._stream_descriptor = stream_descriptor 50 super().__init__(internal_message) 51 52 def __str__(self) -> str: 53 """Return the user-facing message, falling back to internal_message.""" 54 if self.message is not None: 55 return self.message 56 if self.internal_message is not None: 57 return self.internal_message 58 return "" 59 60 def as_airbyte_message( 61 self, stream_descriptor: Optional[StreamDescriptor] = None 62 ) -> AirbyteMessage: 63 """ 64 Builds an AirbyteTraceMessage from the exception 65 66 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 67 stream_descriptors are defined, the one from `as_airbyte_message` will be discarded. 68 """ 69 now_millis = time.time_ns() // 1_000_000 70 71 trace_exc = self._exception or self 72 stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format()) 73 74 trace_message = AirbyteTraceMessage( 75 type=TraceType.ERROR, 76 emitted_at=now_millis, 77 error=AirbyteErrorTraceMessage( 78 message=self.message 79 or "Something went wrong in the connector. See the logs for more details.", 80 internal_message=self.internal_message, 81 failure_type=self.failure_type, 82 stack_trace=stack_trace_str, 83 stream_descriptor=self._stream_descriptor 84 if self._stream_descriptor is not None 85 else stream_descriptor, 86 ), 87 ) 88 89 return AirbyteMessage(type=MessageType.TRACE, trace=trace_message) 90 91 def as_connection_status_message(self) -> Optional[AirbyteMessage]: 92 if self.failure_type == FailureType.config_error: 93 return AirbyteMessage( 94 type=MessageType.CONNECTION_STATUS, 95 connectionStatus=AirbyteConnectionStatus( 96 status=Status.FAILED, message=self.message 97 ), 98 ) 99 return None 100 101 def emit_message(self) -> None: 102 """ 103 Prints the exception as an AirbyteTraceMessage. 104 Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint. 105 """ 106 message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode() 107 filtered_message = filter_secrets(message) 108 print(filtered_message) 109 110 @classmethod 111 def from_exception( 112 cls, 113 exc: BaseException, 114 stream_descriptor: Optional[StreamDescriptor] = None, 115 *args: Any, 116 **kwargs: Any, 117 ) -> "AirbyteTracedException": 118 """ 119 Helper to create an AirbyteTracedException from an existing exception 120 :param exc: the exception that caused the error 121 :param stream_descriptor: describe the stream from which the exception comes from 122 """ 123 if isinstance(exc, AirbyteTracedException): 124 internal_message = exc.internal_message 125 # Preserve the original user-facing message if the caller didn't provide one 126 if "message" not in kwargs: 127 kwargs["message"] = exc.message 128 else: 129 internal_message = str(exc) 130 return cls( 131 internal_message=internal_message, 132 exception=exc, 133 stream_descriptor=stream_descriptor, 134 *args, 135 **kwargs, 136 ) # type: ignore # ignoring because of args and kwargs 137 138 def as_sanitized_airbyte_message( 139 self, stream_descriptor: Optional[StreamDescriptor] = None 140 ) -> AirbyteMessage: 141 """ 142 Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body 143 144 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 145 stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded. 146 """ 147 error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor) 148 if error_message.trace.error.message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 149 error_message.trace.error.message = filter_secrets( # type: ignore[union-attr] 150 error_message.trace.error.message, # type: ignore[union-attr] 151 ) 152 if error_message.trace.error.internal_message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 153 error_message.trace.error.internal_message = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 154 error_message.trace.error.internal_message # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 155 ) 156 if error_message.trace.error.stack_trace: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 157 error_message.trace.error.stack_trace = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 158 error_message.trace.error.stack_trace # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 159 ) 160 return error_message
26class AirbyteTracedException(Exception): 27 """ 28 An exception that should be emitted as an AirbyteTraceMessage 29 """ 30 31 def __init__( 32 self, 33 internal_message: Optional[str] = None, 34 message: Optional[str] = None, 35 failure_type: FailureType = FailureType.system_error, 36 exception: Optional[BaseException] = None, 37 stream_descriptor: Optional[StreamDescriptor] = None, 38 ): 39 """ 40 :param internal_message: the internal error that caused the failure 41 :param message: a user-friendly message that indicates the cause of the error 42 :param failure_type: the type of error 43 :param exception: the exception that caused the error, from which the stack trace should be retrieved 44 :param stream_descriptor: describe the stream from which the exception comes from 45 """ 46 self.internal_message = internal_message 47 self.message = message 48 self.failure_type = failure_type 49 self._exception = exception 50 self._stream_descriptor = stream_descriptor 51 super().__init__(internal_message) 52 53 def __str__(self) -> str: 54 """Return the user-facing message, falling back to internal_message.""" 55 if self.message is not None: 56 return self.message 57 if self.internal_message is not None: 58 return self.internal_message 59 return "" 60 61 def as_airbyte_message( 62 self, stream_descriptor: Optional[StreamDescriptor] = None 63 ) -> AirbyteMessage: 64 """ 65 Builds an AirbyteTraceMessage from the exception 66 67 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 68 stream_descriptors are defined, the one from `as_airbyte_message` will be discarded. 69 """ 70 now_millis = time.time_ns() // 1_000_000 71 72 trace_exc = self._exception or self 73 stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format()) 74 75 trace_message = AirbyteTraceMessage( 76 type=TraceType.ERROR, 77 emitted_at=now_millis, 78 error=AirbyteErrorTraceMessage( 79 message=self.message 80 or "Something went wrong in the connector. See the logs for more details.", 81 internal_message=self.internal_message, 82 failure_type=self.failure_type, 83 stack_trace=stack_trace_str, 84 stream_descriptor=self._stream_descriptor 85 if self._stream_descriptor is not None 86 else stream_descriptor, 87 ), 88 ) 89 90 return AirbyteMessage(type=MessageType.TRACE, trace=trace_message) 91 92 def as_connection_status_message(self) -> Optional[AirbyteMessage]: 93 if self.failure_type == FailureType.config_error: 94 return AirbyteMessage( 95 type=MessageType.CONNECTION_STATUS, 96 connectionStatus=AirbyteConnectionStatus( 97 status=Status.FAILED, message=self.message 98 ), 99 ) 100 return None 101 102 def emit_message(self) -> None: 103 """ 104 Prints the exception as an AirbyteTraceMessage. 105 Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint. 106 """ 107 message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode() 108 filtered_message = filter_secrets(message) 109 print(filtered_message) 110 111 @classmethod 112 def from_exception( 113 cls, 114 exc: BaseException, 115 stream_descriptor: Optional[StreamDescriptor] = None, 116 *args: Any, 117 **kwargs: Any, 118 ) -> "AirbyteTracedException": 119 """ 120 Helper to create an AirbyteTracedException from an existing exception 121 :param exc: the exception that caused the error 122 :param stream_descriptor: describe the stream from which the exception comes from 123 """ 124 if isinstance(exc, AirbyteTracedException): 125 internal_message = exc.internal_message 126 # Preserve the original user-facing message if the caller didn't provide one 127 if "message" not in kwargs: 128 kwargs["message"] = exc.message 129 else: 130 internal_message = str(exc) 131 return cls( 132 internal_message=internal_message, 133 exception=exc, 134 stream_descriptor=stream_descriptor, 135 *args, 136 **kwargs, 137 ) # type: ignore # ignoring because of args and kwargs 138 139 def as_sanitized_airbyte_message( 140 self, stream_descriptor: Optional[StreamDescriptor] = None 141 ) -> AirbyteMessage: 142 """ 143 Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body 144 145 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 146 stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded. 147 """ 148 error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor) 149 if error_message.trace.error.message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 150 error_message.trace.error.message = filter_secrets( # type: ignore[union-attr] 151 error_message.trace.error.message, # type: ignore[union-attr] 152 ) 153 if error_message.trace.error.internal_message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 154 error_message.trace.error.internal_message = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 155 error_message.trace.error.internal_message # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 156 ) 157 if error_message.trace.error.stack_trace: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 158 error_message.trace.error.stack_trace = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 159 error_message.trace.error.stack_trace # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 160 ) 161 return error_message
An exception that should be emitted as an AirbyteTraceMessage
31 def __init__( 32 self, 33 internal_message: Optional[str] = None, 34 message: Optional[str] = None, 35 failure_type: FailureType = FailureType.system_error, 36 exception: Optional[BaseException] = None, 37 stream_descriptor: Optional[StreamDescriptor] = None, 38 ): 39 """ 40 :param internal_message: the internal error that caused the failure 41 :param message: a user-friendly message that indicates the cause of the error 42 :param failure_type: the type of error 43 :param exception: the exception that caused the error, from which the stack trace should be retrieved 44 :param stream_descriptor: describe the stream from which the exception comes from 45 """ 46 self.internal_message = internal_message 47 self.message = message 48 self.failure_type = failure_type 49 self._exception = exception 50 self._stream_descriptor = stream_descriptor 51 super().__init__(internal_message)
Parameters
- internal_message: the internal error that caused the failure
- message: a user-friendly message that indicates the cause of the error
- failure_type: the type of error
- exception: the exception that caused the error, from which the stack trace should be retrieved
- stream_descriptor: describe the stream from which the exception comes from
61 def as_airbyte_message( 62 self, stream_descriptor: Optional[StreamDescriptor] = None 63 ) -> AirbyteMessage: 64 """ 65 Builds an AirbyteTraceMessage from the exception 66 67 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 68 stream_descriptors are defined, the one from `as_airbyte_message` will be discarded. 69 """ 70 now_millis = time.time_ns() // 1_000_000 71 72 trace_exc = self._exception or self 73 stack_trace_str = "".join(traceback.TracebackException.from_exception(trace_exc).format()) 74 75 trace_message = AirbyteTraceMessage( 76 type=TraceType.ERROR, 77 emitted_at=now_millis, 78 error=AirbyteErrorTraceMessage( 79 message=self.message 80 or "Something went wrong in the connector. See the logs for more details.", 81 internal_message=self.internal_message, 82 failure_type=self.failure_type, 83 stack_trace=stack_trace_str, 84 stream_descriptor=self._stream_descriptor 85 if self._stream_descriptor is not None 86 else stream_descriptor, 87 ), 88 ) 89 90 return AirbyteMessage(type=MessageType.TRACE, trace=trace_message)
Builds an AirbyteTraceMessage from the exception
:param stream_descriptor is deprecated, please use the stream_description in __init__ orfrom_exception. If many
stream_descriptors are defined, the one fromas_airbyte_message` will be discarded.
92 def as_connection_status_message(self) -> Optional[AirbyteMessage]: 93 if self.failure_type == FailureType.config_error: 94 return AirbyteMessage( 95 type=MessageType.CONNECTION_STATUS, 96 connectionStatus=AirbyteConnectionStatus( 97 status=Status.FAILED, message=self.message 98 ), 99 ) 100 return None
102 def emit_message(self) -> None: 103 """ 104 Prints the exception as an AirbyteTraceMessage. 105 Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint. 106 """ 107 message = orjson.dumps(AirbyteMessageSerializer.dump(self.as_airbyte_message())).decode() 108 filtered_message = filter_secrets(message) 109 print(filtered_message)
Prints the exception as an AirbyteTraceMessage. Note that this will be called automatically on uncaught exceptions when using the airbyte_cdk entrypoint.
111 @classmethod 112 def from_exception( 113 cls, 114 exc: BaseException, 115 stream_descriptor: Optional[StreamDescriptor] = None, 116 *args: Any, 117 **kwargs: Any, 118 ) -> "AirbyteTracedException": 119 """ 120 Helper to create an AirbyteTracedException from an existing exception 121 :param exc: the exception that caused the error 122 :param stream_descriptor: describe the stream from which the exception comes from 123 """ 124 if isinstance(exc, AirbyteTracedException): 125 internal_message = exc.internal_message 126 # Preserve the original user-facing message if the caller didn't provide one 127 if "message" not in kwargs: 128 kwargs["message"] = exc.message 129 else: 130 internal_message = str(exc) 131 return cls( 132 internal_message=internal_message, 133 exception=exc, 134 stream_descriptor=stream_descriptor, 135 *args, 136 **kwargs, 137 ) # type: ignore # ignoring because of args and kwargs
Helper to create an AirbyteTracedException from an existing exception
Parameters
- exc: the exception that caused the error
- stream_descriptor: describe the stream from which the exception comes from
139 def as_sanitized_airbyte_message( 140 self, stream_descriptor: Optional[StreamDescriptor] = None 141 ) -> AirbyteMessage: 142 """ 143 Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body 144 145 :param stream_descriptor is deprecated, please use the stream_description in `__init__ or `from_exception`. If many 146 stream_descriptors are defined, the one from `as_sanitized_airbyte_message` will be discarded. 147 """ 148 error_message = self.as_airbyte_message(stream_descriptor=stream_descriptor) 149 if error_message.trace.error.message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 150 error_message.trace.error.message = filter_secrets( # type: ignore[union-attr] 151 error_message.trace.error.message, # type: ignore[union-attr] 152 ) 153 if error_message.trace.error.internal_message: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 154 error_message.trace.error.internal_message = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 155 error_message.trace.error.internal_message # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 156 ) 157 if error_message.trace.error.stack_trace: # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 158 error_message.trace.error.stack_trace = filter_secrets( # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 159 error_message.trace.error.stack_trace # type: ignore[union-attr] # AirbyteMessage with MessageType.TRACE has AirbyteTraceMessage 160 ) 161 return error_message
Builds an AirbyteTraceMessage from the exception and sanitizes any secrets from the message body
:param stream_descriptor is deprecated, please use the stream_description in __init__ orfrom_exception. If many
stream_descriptors are defined, the one fromas_sanitized_airbyte_message` will be discarded.