airbyte_cdk.sources.message
1# 2# Copyright (c) 2021 Airbyte, Inc., all rights reserved. 3# 4 5from .repository import ( 6 InMemoryMessageRepository, 7 LogAppenderMessageRepositoryDecorator, 8 LogMessage, 9 MessageRepository, 10 NoopMessageRepository, 11) 12 13__all__ = [ 14 "InMemoryMessageRepository", 15 "LogAppenderMessageRepositoryDecorator", 16 "LogMessage", 17 "MessageRepository", 18 "NoopMessageRepository", 19]
75class InMemoryMessageRepository(MessageRepository): 76 def __init__(self, log_level: Level = Level.INFO) -> None: 77 self._message_queue: Deque[AirbyteMessage] = deque() 78 self._log_level = log_level 79 80 def emit_message(self, message: AirbyteMessage) -> None: 81 self._message_queue.append(message) 82 83 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 84 if _is_severe_enough(self._log_level, level): 85 self.emit_message( 86 AirbyteMessage( 87 type=Type.LOG, 88 log=AirbyteLogMessage( 89 level=level, message=filter_secrets(json.dumps(message_provider())) 90 ), 91 ) 92 ) 93 94 def consume_queue(self) -> Iterable[AirbyteMessage]: 95 while self._message_queue: 96 yield self._message_queue.popleft()
Helper class that provides a standard way to create an ABC using inheritance.
83 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 84 if _is_severe_enough(self._log_level, level): 85 self.emit_message( 86 AirbyteMessage( 87 type=Type.LOG, 88 log=AirbyteLogMessage( 89 level=level, message=filter_secrets(json.dumps(message_provider())) 90 ), 91 ) 92 )
Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured
99class LogAppenderMessageRepositoryDecorator(MessageRepository): 100 def __init__( 101 self, 102 dict_to_append: LogMessage, 103 decorated: MessageRepository, 104 log_level: Level = Level.INFO, 105 ): 106 self._dict_to_append = dict_to_append 107 self._decorated = decorated 108 self._log_level = log_level 109 110 def emit_message(self, message: AirbyteMessage) -> None: 111 self._decorated.emit_message(message) 112 113 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 114 if _is_severe_enough(self._log_level, level): 115 message = message_provider() 116 self._append_second_to_first(message, self._dict_to_append) 117 self._decorated.log_message(level, lambda: message) 118 119 def consume_queue(self) -> Iterable[AirbyteMessage]: 120 return self._decorated.consume_queue() 121 122 def _append_second_to_first( 123 self, first: LogMessage, second: LogMessage, path: Optional[List[str]] = None 124 ) -> LogMessage: 125 if path is None: 126 path = [] 127 128 for key in second: 129 if key in first: 130 if isinstance(first[key], dict) and isinstance(second[key], dict): 131 self._append_second_to_first(first[key], second[key], path + [str(key)]) # type: ignore # type is verified above 132 else: 133 if first[key] != second[key]: 134 _LOGGER.warning("Conflict at %s" % ".".join(path + [str(key)])) 135 first[key] = second[key] 136 else: 137 first[key] = second[key] 138 return first
Helper class that provides a standard way to create an ABC using inheritance.
113 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 114 if _is_severe_enough(self._log_level, level): 115 message = message_provider() 116 self._append_second_to_first(message, self._dict_to_append) 117 self._decorated.log_message(level, lambda: message)
Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured
46class MessageRepository(ABC): 47 @abstractmethod 48 def emit_message(self, message: AirbyteMessage) -> None: 49 raise NotImplementedError() 50 51 @abstractmethod 52 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 53 """ 54 Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if 55 the log level is less severe than what is configured 56 """ 57 raise NotImplementedError() 58 59 @abstractmethod 60 def consume_queue(self) -> Iterable[AirbyteMessage]: 61 raise NotImplementedError()
Helper class that provides a standard way to create an ABC using inheritance.
51 @abstractmethod 52 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 53 """ 54 Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if 55 the log level is less severe than what is configured 56 """ 57 raise NotImplementedError()
Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured
64class NoopMessageRepository(MessageRepository): 65 def emit_message(self, message: AirbyteMessage) -> None: 66 pass 67 68 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 69 pass 70 71 def consume_queue(self) -> Iterable[AirbyteMessage]: 72 return []
Helper class that provides a standard way to create an ABC using inheritance.
Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured