airbyte_cdk.sources.message

 1#
 2# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
 3#
 4
 5from .repository import (
 6    InMemoryMessageRepository,
 7    LogAppenderMessageRepositoryDecorator,
 8    LogMessage,
 9    MessageRepository,
10    NoopMessageRepository,
11)
12
13__all__ = [
14    "InMemoryMessageRepository",
15    "LogAppenderMessageRepositoryDecorator",
16    "LogMessage",
17    "MessageRepository",
18    "NoopMessageRepository",
19]
class InMemoryMessageRepository(airbyte_cdk.sources.message.MessageRepository):
75class InMemoryMessageRepository(MessageRepository):
76    def __init__(self, log_level: Level = Level.INFO) -> None:
77        self._message_queue: Deque[AirbyteMessage] = deque()
78        self._log_level = log_level
79
80    def emit_message(self, message: AirbyteMessage) -> None:
81        self._message_queue.append(message)
82
83    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
84        if _is_severe_enough(self._log_level, level):
85            self.emit_message(
86                AirbyteMessage(
87                    type=Type.LOG,
88                    log=AirbyteLogMessage(
89                        level=level, message=filter_secrets(json.dumps(message_provider()))
90                    ),
91                )
92            )
93
94    def consume_queue(self) -> Iterable[AirbyteMessage]:
95        while self._message_queue:
96            yield self._message_queue.popleft()

Helper class that provides a standard way to create an ABC using inheritance.

InMemoryMessageRepository( log_level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level = <Level.INFO: 'INFO'>)
76    def __init__(self, log_level: Level = Level.INFO) -> None:
77        self._message_queue: Deque[AirbyteMessage] = deque()
78        self._log_level = log_level
def emit_message( self, message: airbyte_cdk.AirbyteMessage) -> None:
80    def emit_message(self, message: AirbyteMessage) -> None:
81        self._message_queue.append(message)
def log_message( self, level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level, message_provider: Callable[[], dict[str, Union[dict[str, Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], list[Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], str, int, float, bool, NoneType]]]) -> None:
83    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
84        if _is_severe_enough(self._log_level, level):
85            self.emit_message(
86                AirbyteMessage(
87                    type=Type.LOG,
88                    log=AirbyteLogMessage(
89                        level=level, message=filter_secrets(json.dumps(message_provider()))
90                    ),
91                )
92            )

Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured

def consume_queue(self) -> Iterable[airbyte_cdk.AirbyteMessage]:
94    def consume_queue(self) -> Iterable[AirbyteMessage]:
95        while self._message_queue:
96            yield self._message_queue.popleft()
class LogAppenderMessageRepositoryDecorator(airbyte_cdk.sources.message.MessageRepository):
 99class LogAppenderMessageRepositoryDecorator(MessageRepository):
100    def __init__(
101        self,
102        dict_to_append: LogMessage,
103        decorated: MessageRepository,
104        log_level: Level = Level.INFO,
105    ):
106        self._dict_to_append = dict_to_append
107        self._decorated = decorated
108        self._log_level = log_level
109
110    def emit_message(self, message: AirbyteMessage) -> None:
111        self._decorated.emit_message(message)
112
113    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
114        if _is_severe_enough(self._log_level, level):
115            message = message_provider()
116            self._append_second_to_first(message, self._dict_to_append)
117            self._decorated.log_message(level, lambda: message)
118
119    def consume_queue(self) -> Iterable[AirbyteMessage]:
120        return self._decorated.consume_queue()
121
122    def _append_second_to_first(
123        self, first: LogMessage, second: LogMessage, path: Optional[List[str]] = None
124    ) -> LogMessage:
125        if path is None:
126            path = []
127
128        for key in second:
129            if key in first:
130                if isinstance(first[key], dict) and isinstance(second[key], dict):
131                    self._append_second_to_first(first[key], second[key], path + [str(key)])  # type: ignore # type is verified above
132                else:
133                    if first[key] != second[key]:
134                        _LOGGER.warning("Conflict at %s" % ".".join(path + [str(key)]))
135                    first[key] = second[key]
136            else:
137                first[key] = second[key]
138        return first

Helper class that provides a standard way to create an ABC using inheritance.

LogAppenderMessageRepositoryDecorator( dict_to_append: dict[str, typing.Union[dict[str, typing.Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], list[typing.Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], str, int, float, bool, NoneType]], decorated: MessageRepository, log_level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level = <Level.INFO: 'INFO'>)
100    def __init__(
101        self,
102        dict_to_append: LogMessage,
103        decorated: MessageRepository,
104        log_level: Level = Level.INFO,
105    ):
106        self._dict_to_append = dict_to_append
107        self._decorated = decorated
108        self._log_level = log_level
def emit_message( self, message: airbyte_cdk.AirbyteMessage) -> None:
110    def emit_message(self, message: AirbyteMessage) -> None:
111        self._decorated.emit_message(message)
def log_message( self, level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level, message_provider: Callable[[], dict[str, Union[dict[str, Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], list[Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], str, int, float, bool, NoneType]]]) -> None:
113    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
114        if _is_severe_enough(self._log_level, level):
115            message = message_provider()
116            self._append_second_to_first(message, self._dict_to_append)
117            self._decorated.log_message(level, lambda: message)

Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured

def consume_queue(self) -> Iterable[airbyte_cdk.AirbyteMessage]:
119    def consume_queue(self) -> Iterable[AirbyteMessage]:
120        return self._decorated.consume_queue()
LogMessage = dict[str, typing.Union[dict[str, 'JsonType'], list['JsonType'], str, int, float, bool, NoneType]]
class MessageRepository(abc.ABC):
46class MessageRepository(ABC):
47    @abstractmethod
48    def emit_message(self, message: AirbyteMessage) -> None:
49        raise NotImplementedError()
50
51    @abstractmethod
52    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
53        """
54        Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if
55        the log level is less severe than what is configured
56        """
57        raise NotImplementedError()
58
59    @abstractmethod
60    def consume_queue(self) -> Iterable[AirbyteMessage]:
61        raise NotImplementedError()

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def emit_message( self, message: airbyte_cdk.AirbyteMessage) -> None:
47    @abstractmethod
48    def emit_message(self, message: AirbyteMessage) -> None:
49        raise NotImplementedError()
@abstractmethod
def log_message( self, level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level, message_provider: Callable[[], dict[str, Union[dict[str, Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], list[Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], str, int, float, bool, NoneType]]]) -> None:
51    @abstractmethod
52    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
53        """
54        Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if
55        the log level is less severe than what is configured
56        """
57        raise NotImplementedError()

Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured

@abstractmethod
def consume_queue(self) -> Iterable[airbyte_cdk.AirbyteMessage]:
59    @abstractmethod
60    def consume_queue(self) -> Iterable[AirbyteMessage]:
61        raise NotImplementedError()
class NoopMessageRepository(airbyte_cdk.sources.message.MessageRepository):
64class NoopMessageRepository(MessageRepository):
65    def emit_message(self, message: AirbyteMessage) -> None:
66        pass
67
68    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
69        pass
70
71    def consume_queue(self) -> Iterable[AirbyteMessage]:
72        return []

Helper class that provides a standard way to create an ABC using inheritance.

def emit_message( self, message: airbyte_cdk.AirbyteMessage) -> None:
65    def emit_message(self, message: AirbyteMessage) -> None:
66        pass
def log_message( self, level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level, message_provider: Callable[[], dict[str, Union[dict[str, Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], list[Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], str, int, float, bool, NoneType]]]) -> None:
68    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
69        pass

Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured

def consume_queue(self) -> Iterable[airbyte_cdk.AirbyteMessage]:
71    def consume_queue(self) -> Iterable[AirbyteMessage]:
72        return []