airbyte_cdk.sources.utils.slice_logger

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5import json
 6import logging
 7from abc import ABC, abstractmethod
 8from typing import Any, Mapping, Optional
 9
10from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
11from airbyte_cdk.models import Type as MessageType
12
13
14# Once everything runs on the concurrent CDK and we've cleaned up the legacy flows, we should try to remove
15# this class and write messages directly to the message_repository instead of through the logger because for
16# cases like the connector builder where ordering of messages is important, using the logger can cause
17# messages to be grouped out of order. Alas work for a different day.
18class SliceLogger(ABC):
19    """
20    SliceLogger is an interface that allows us to log slices of data in a uniform way.
21    It is responsible for determining whether or not a slice should be logged and for creating the log message.
22    """
23
24    SLICE_LOG_PREFIX = "slice:"
25
26    def create_slice_log_message(self, _slice: Optional[Mapping[str, Any]]) -> AirbyteMessage:
27        """
28        Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a `str(<object>)` if
29        the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump
30        """
31        printable_slice = dict(_slice) if _slice else _slice
32        return AirbyteMessage(
33            type=MessageType.LOG,
34            log=AirbyteLogMessage(
35                level=Level.INFO,
36                message=f"{SliceLogger.SLICE_LOG_PREFIX}{json.dumps(printable_slice, default=str)}",
37            ),
38        )
39
40    @abstractmethod
41    def should_log_slice_message(self, logger: logging.Logger) -> bool:
42        """
43
44        :param logger:
45        :return:
46        """
47
48
49class DebugSliceLogger(SliceLogger):
50    def should_log_slice_message(self, logger: logging.Logger) -> bool:
51        """
52
53        :param logger:
54        :return:
55        """
56        return logger.isEnabledFor(logging.DEBUG)
57
58
59class AlwaysLogSliceLogger(SliceLogger):
60    def should_log_slice_message(self, logger: logging.Logger) -> bool:
61        return True
class SliceLogger(abc.ABC):
19class SliceLogger(ABC):
20    """
21    SliceLogger is an interface that allows us to log slices of data in a uniform way.
22    It is responsible for determining whether or not a slice should be logged and for creating the log message.
23    """
24
25    SLICE_LOG_PREFIX = "slice:"
26
27    def create_slice_log_message(self, _slice: Optional[Mapping[str, Any]]) -> AirbyteMessage:
28        """
29        Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a `str(<object>)` if
30        the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump
31        """
32        printable_slice = dict(_slice) if _slice else _slice
33        return AirbyteMessage(
34            type=MessageType.LOG,
35            log=AirbyteLogMessage(
36                level=Level.INFO,
37                message=f"{SliceLogger.SLICE_LOG_PREFIX}{json.dumps(printable_slice, default=str)}",
38            ),
39        )
40
41    @abstractmethod
42    def should_log_slice_message(self, logger: logging.Logger) -> bool:
43        """
44
45        :param logger:
46        :return:
47        """

SliceLogger is an interface that allows us to log slices of data in a uniform way. It is responsible for determining whether or not a slice should be logged and for creating the log message.

SLICE_LOG_PREFIX = 'slice:'
def create_slice_log_message( self, _slice: Optional[Mapping[str, Any]]) -> airbyte_cdk.AirbyteMessage:
27    def create_slice_log_message(self, _slice: Optional[Mapping[str, Any]]) -> AirbyteMessage:
28        """
29        Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a `str(<object>)` if
30        the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump
31        """
32        printable_slice = dict(_slice) if _slice else _slice
33        return AirbyteMessage(
34            type=MessageType.LOG,
35            log=AirbyteLogMessage(
36                level=Level.INFO,
37                message=f"{SliceLogger.SLICE_LOG_PREFIX}{json.dumps(printable_slice, default=str)}",
38            ),
39        )

Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a str(<object>) if the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump

@abstractmethod
def should_log_slice_message(self, logger: logging.Logger) -> bool:
41    @abstractmethod
42    def should_log_slice_message(self, logger: logging.Logger) -> bool:
43        """
44
45        :param logger:
46        :return:
47        """
Parameters
  • logger:
Returns
class DebugSliceLogger(SliceLogger):
50class DebugSliceLogger(SliceLogger):
51    def should_log_slice_message(self, logger: logging.Logger) -> bool:
52        """
53
54        :param logger:
55        :return:
56        """
57        return logger.isEnabledFor(logging.DEBUG)

SliceLogger is an interface that allows us to log slices of data in a uniform way. It is responsible for determining whether or not a slice should be logged and for creating the log message.

def should_log_slice_message(self, logger: logging.Logger) -> bool:
51    def should_log_slice_message(self, logger: logging.Logger) -> bool:
52        """
53
54        :param logger:
55        :return:
56        """
57        return logger.isEnabledFor(logging.DEBUG)
Parameters
  • logger:
Returns
class AlwaysLogSliceLogger(SliceLogger):
60class AlwaysLogSliceLogger(SliceLogger):
61    def should_log_slice_message(self, logger: logging.Logger) -> bool:
62        return True

SliceLogger is an interface that allows us to log slices of data in a uniform way. It is responsible for determining whether or not a slice should be logged and for creating the log message.

def should_log_slice_message(self, logger: logging.Logger) -> bool:
61    def should_log_slice_message(self, logger: logging.Logger) -> bool:
62        return True
Parameters
  • logger:
Returns