airbyte_cdk.sources.message

 1#
 2# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
 3#
 4
 5from .repository import (
 6    InMemoryMessageRepository,
 7    LogAppenderMessageRepositoryDecorator,
 8    LogMessage,
 9    MessageRepository,
10    NoopMessageRepository,
11)
12
13__all__ = [
14    "InMemoryMessageRepository",
15    "LogAppenderMessageRepositoryDecorator",
16    "LogMessage",
17    "MessageRepository",
18    "NoopMessageRepository",
19]
class InMemoryMessageRepository(airbyte_cdk.sources.message.MessageRepository):
75class InMemoryMessageRepository(MessageRepository):
76    def __init__(self, log_level: Level = Level.INFO) -> None:
77        self._message_queue: Deque[AirbyteMessage] = deque()
78        self._log_level = log_level
79
80    def emit_message(self, message: AirbyteMessage) -> None:
81        self._message_queue.append(message)
82
83    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
84        if _is_severe_enough(self._log_level, level):
85            self.emit_message(
86                AirbyteMessage(
87                    type=Type.LOG,
88                    log=AirbyteLogMessage(
89                        level=level, message=filter_secrets(json.dumps(message_provider()))
90                    ),
91                )
92            )
93
94    def consume_queue(self) -> Iterable[AirbyteMessage]:
95        while self._message_queue:
96            yield self._message_queue.popleft()

Helper class that provides a standard way to create an ABC using inheritance.

InMemoryMessageRepository( log_level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level = <Level.INFO: 'INFO'>)
76    def __init__(self, log_level: Level = Level.INFO) -> None:
77        self._message_queue: Deque[AirbyteMessage] = deque()
78        self._log_level = log_level
def emit_message( self, message: airbyte_cdk.AirbyteMessage) -> None:
80    def emit_message(self, message: AirbyteMessage) -> None:
81        self._message_queue.append(message)
def log_message( self, level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level, message_provider: Callable[[], dict[str, Union[dict[str, Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], list[Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], str, int, float, bool, NoneType]]]) -> None:
83    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
84        if _is_severe_enough(self._log_level, level):
85            self.emit_message(
86                AirbyteMessage(
87                    type=Type.LOG,
88                    log=AirbyteLogMessage(
89                        level=level, message=filter_secrets(json.dumps(message_provider()))
90                    ),
91                )
92            )

Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured

def consume_queue(self) -> Iterable[airbyte_cdk.AirbyteMessage]:
94    def consume_queue(self) -> Iterable[AirbyteMessage]:
95        while self._message_queue:
96            yield self._message_queue.popleft()
class LogAppenderMessageRepositoryDecorator(airbyte_cdk.sources.message.MessageRepository):
119class LogAppenderMessageRepositoryDecorator(MessageRepository):
120    def __init__(
121        self,
122        dict_to_append: LogMessage,
123        decorated: MessageRepository,
124        log_level: Level = Level.INFO,
125    ):
126        self._dict_to_append = dict_to_append
127        self._decorated = decorated
128        self._log_level = log_level
129
130    def emit_message(self, message: AirbyteMessage) -> None:
131        self._decorated.emit_message(message)
132
133    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
134        if _is_severe_enough(self._log_level, level):
135            message = message_provider()
136            self._append_second_to_first(message, self._dict_to_append)
137            self._decorated.log_message(level, lambda: message)
138
139    def consume_queue(self) -> Iterable[AirbyteMessage]:
140        return self._decorated.consume_queue()
141
142    def _append_second_to_first(
143        self, first: LogMessage, second: LogMessage, path: Optional[List[str]] = None
144    ) -> LogMessage:
145        if path is None:
146            path = []
147
148        for key in second:
149            if key in first:
150                if isinstance(first[key], dict) and isinstance(second[key], dict):
151                    self._append_second_to_first(first[key], second[key], path + [str(key)])  # type: ignore # type is verified above
152                else:
153                    if first[key] != second[key]:
154                        _LOGGER.warning("Conflict at %s" % ".".join(path + [str(key)]))
155                    first[key] = second[key]
156            else:
157                first[key] = second[key]
158        return first

Helper class that provides a standard way to create an ABC using inheritance.

LogAppenderMessageRepositoryDecorator( dict_to_append: dict[str, typing.Union[dict[str, typing.Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], list[typing.Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], str, int, float, bool, NoneType]], decorated: MessageRepository, log_level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level = <Level.INFO: 'INFO'>)
120    def __init__(
121        self,
122        dict_to_append: LogMessage,
123        decorated: MessageRepository,
124        log_level: Level = Level.INFO,
125    ):
126        self._dict_to_append = dict_to_append
127        self._decorated = decorated
128        self._log_level = log_level
def emit_message( self, message: airbyte_cdk.AirbyteMessage) -> None:
130    def emit_message(self, message: AirbyteMessage) -> None:
131        self._decorated.emit_message(message)
def log_message( self, level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level, message_provider: Callable[[], dict[str, Union[dict[str, Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], list[Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], str, int, float, bool, NoneType]]]) -> None:
133    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
134        if _is_severe_enough(self._log_level, level):
135            message = message_provider()
136            self._append_second_to_first(message, self._dict_to_append)
137            self._decorated.log_message(level, lambda: message)

Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured

def consume_queue(self) -> Iterable[airbyte_cdk.AirbyteMessage]:
139    def consume_queue(self) -> Iterable[AirbyteMessage]:
140        return self._decorated.consume_queue()
LogMessage = dict[str, typing.Union[dict[str, 'JsonType'], list['JsonType'], str, int, float, bool, NoneType]]
class MessageRepository(abc.ABC):
46class MessageRepository(ABC):
47    @abstractmethod
48    def emit_message(self, message: AirbyteMessage) -> None:
49        raise NotImplementedError()
50
51    @abstractmethod
52    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
53        """
54        Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if
55        the log level is less severe than what is configured
56        """
57        raise NotImplementedError()
58
59    @abstractmethod
60    def consume_queue(self) -> Iterable[AirbyteMessage]:
61        raise NotImplementedError()

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def emit_message( self, message: airbyte_cdk.AirbyteMessage) -> None:
47    @abstractmethod
48    def emit_message(self, message: AirbyteMessage) -> None:
49        raise NotImplementedError()
@abstractmethod
def log_message( self, level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level, message_provider: Callable[[], dict[str, Union[dict[str, Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], list[Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], str, int, float, bool, NoneType]]]) -> None:
51    @abstractmethod
52    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
53        """
54        Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if
55        the log level is less severe than what is configured
56        """
57        raise NotImplementedError()

Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured

@abstractmethod
def consume_queue(self) -> Iterable[airbyte_cdk.AirbyteMessage]:
59    @abstractmethod
60    def consume_queue(self) -> Iterable[AirbyteMessage]:
61        raise NotImplementedError()
class NoopMessageRepository(airbyte_cdk.sources.message.MessageRepository):
64class NoopMessageRepository(MessageRepository):
65    def emit_message(self, message: AirbyteMessage) -> None:
66        pass
67
68    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
69        pass
70
71    def consume_queue(self) -> Iterable[AirbyteMessage]:
72        return []

Helper class that provides a standard way to create an ABC using inheritance.

def emit_message( self, message: airbyte_cdk.AirbyteMessage) -> None:
65    def emit_message(self, message: AirbyteMessage) -> None:
66        pass
def log_message( self, level: airbyte_protocol_dataclasses.models.airbyte_protocol.Level, message_provider: Callable[[], dict[str, Union[dict[str, Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], list[Union[dict[str, ForwardRef('JsonType')], list[ForwardRef('JsonType')], str, int, float, bool, NoneType]], str, int, float, bool, NoneType]]]) -> None:
68    def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
69        pass

Computing messages can be resource consuming. This method is specialized for logging because we want to allow for lazy evaluation if the log level is less severe than what is configured

def consume_queue(self) -> Iterable[airbyte_cdk.AirbyteMessage]:
71    def consume_queue(self) -> Iterable[AirbyteMessage]:
72        return []