airbyte_cdk.sources.message
1# 2# Copyright (c) 2021 Airbyte, Inc., all rights reserved. 3# 4 5from .repository import ( 6 InMemoryMessageRepository, 7 LogAppenderMessageRepositoryDecorator, 8 LogMessage, 9 MessageRepository, 10 NoopMessageRepository, 11) 12 13__all__ = [ 14 "InMemoryMessageRepository", 15 "LogAppenderMessageRepositoryDecorator", 16 "LogMessage", 17 "MessageRepository", 18 "NoopMessageRepository", 19]
75class InMemoryMessageRepository(MessageRepository): 76 def __init__(self, log_level: Level = Level.INFO) -> None: 77 self._message_queue: Deque[AirbyteMessage] = deque() 78 self._log_level = log_level 79 80 def emit_message(self, message: AirbyteMessage) -> None: 81 self._message_queue.append(message) 82 83 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 84 if _is_severe_enough(self._log_level, level): 85 self.emit_message( 86 AirbyteMessage( 87 type=Type.LOG, 88 log=AirbyteLogMessage( 89 level=level, message=filter_secrets(json.dumps(message_provider())) 90 ), 91 ) 92 ) 93 94 def consume_queue(self) -> Iterable[AirbyteMessage]: 95 while self._message_queue: 96 yield self._message_queue.popleft()
Helper class that provides a standard way to create an ABC using inheritance.
83 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 84 if _is_severe_enough(self._log_level, level): 85 self.emit_message( 86 AirbyteMessage( 87 type=Type.LOG, 88 log=AirbyteLogMessage( 89 level=level, message=filter_secrets(json.dumps(message_provider())) 90 ), 91 ) 92 )
Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured
119class LogAppenderMessageRepositoryDecorator(MessageRepository): 120 def __init__( 121 self, 122 dict_to_append: LogMessage, 123 decorated: MessageRepository, 124 log_level: Level = Level.INFO, 125 ): 126 self._dict_to_append = dict_to_append 127 self._decorated = decorated 128 self._log_level = log_level 129 130 def emit_message(self, message: AirbyteMessage) -> None: 131 self._decorated.emit_message(message) 132 133 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 134 if _is_severe_enough(self._log_level, level): 135 message = message_provider() 136 self._append_second_to_first(message, self._dict_to_append) 137 self._decorated.log_message(level, lambda: message) 138 139 def consume_queue(self) -> Iterable[AirbyteMessage]: 140 return self._decorated.consume_queue() 141 142 def _append_second_to_first( 143 self, first: LogMessage, second: LogMessage, path: Optional[List[str]] = None 144 ) -> LogMessage: 145 if path is None: 146 path = [] 147 148 for key in second: 149 if key in first: 150 if isinstance(first[key], dict) and isinstance(second[key], dict): 151 self._append_second_to_first(first[key], second[key], path + [str(key)]) # type: ignore # type is verified above 152 else: 153 if first[key] != second[key]: 154 _LOGGER.warning("Conflict at %s" % ".".join(path + [str(key)])) 155 first[key] = second[key] 156 else: 157 first[key] = second[key] 158 return first
Helper class that provides a standard way to create an ABC using inheritance.
133 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 134 if _is_severe_enough(self._log_level, level): 135 message = message_provider() 136 self._append_second_to_first(message, self._dict_to_append) 137 self._decorated.log_message(level, lambda: message)
Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured
46class MessageRepository(ABC): 47 @abstractmethod 48 def emit_message(self, message: AirbyteMessage) -> None: 49 raise NotImplementedError() 50 51 @abstractmethod 52 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 53 """ 54 Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if 55 the log level is less severe than what is configured 56 """ 57 raise NotImplementedError() 58 59 @abstractmethod 60 def consume_queue(self) -> Iterable[AirbyteMessage]: 61 raise NotImplementedError()
Helper class that provides a standard way to create an ABC using inheritance.
51 @abstractmethod 52 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 53 """ 54 Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if 55 the log level is less severe than what is configured 56 """ 57 raise NotImplementedError()
Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured
64class NoopMessageRepository(MessageRepository): 65 def emit_message(self, message: AirbyteMessage) -> None: 66 pass 67 68 def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None: 69 pass 70 71 def consume_queue(self) -> Iterable[AirbyteMessage]: 72 return []
Helper class that provides a standard way to create an ABC using inheritance.
Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured