airbyte_cdk.sources.utils.slice_logger

 1#
 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
 3#
 4
 5import json
 6import logging
 7from abc import ABC, abstractmethod
 8from typing import Any, Mapping, Optional
 9
10from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level
11from airbyte_cdk.models import Type as MessageType
12
13
14class SliceLogger(ABC):
15    """
16    SliceLogger is an interface that allows us to log slices of data in a uniform way.
17    It is responsible for determining whether or not a slice should be logged and for creating the log message.
18    """
19
20    SLICE_LOG_PREFIX = "slice:"
21
22    def create_slice_log_message(self, _slice: Optional[Mapping[str, Any]]) -> AirbyteMessage:
23        """
24        Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a `str(<object>)` if
25        the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump
26        """
27        printable_slice = dict(_slice) if _slice else _slice
28        return AirbyteMessage(
29            type=MessageType.LOG,
30            log=AirbyteLogMessage(
31                level=Level.INFO,
32                message=f"{SliceLogger.SLICE_LOG_PREFIX}{json.dumps(printable_slice, default=str)}",
33            ),
34        )
35
36    @abstractmethod
37    def should_log_slice_message(self, logger: logging.Logger) -> bool:
38        """
39
40        :param logger:
41        :return:
42        """
43
44
45class DebugSliceLogger(SliceLogger):
46    def should_log_slice_message(self, logger: logging.Logger) -> bool:
47        """
48
49        :param logger:
50        :return:
51        """
52        return logger.isEnabledFor(logging.DEBUG)
53
54
55class AlwaysLogSliceLogger(SliceLogger):
56    def should_log_slice_message(self, logger: logging.Logger) -> bool:
57        return True
class SliceLogger(abc.ABC):
15class SliceLogger(ABC):
16    """
17    SliceLogger is an interface that allows us to log slices of data in a uniform way.
18    It is responsible for determining whether or not a slice should be logged and for creating the log message.
19    """
20
21    SLICE_LOG_PREFIX = "slice:"
22
23    def create_slice_log_message(self, _slice: Optional[Mapping[str, Any]]) -> AirbyteMessage:
24        """
25        Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a `str(<object>)` if
26        the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump
27        """
28        printable_slice = dict(_slice) if _slice else _slice
29        return AirbyteMessage(
30            type=MessageType.LOG,
31            log=AirbyteLogMessage(
32                level=Level.INFO,
33                message=f"{SliceLogger.SLICE_LOG_PREFIX}{json.dumps(printable_slice, default=str)}",
34            ),
35        )
36
37    @abstractmethod
38    def should_log_slice_message(self, logger: logging.Logger) -> bool:
39        """
40
41        :param logger:
42        :return:
43        """

SliceLogger is an interface that allows us to log slices of data in a uniform way. It is responsible for determining whether or not a slice should be logged and for creating the log message.

SLICE_LOG_PREFIX = 'slice:'
def create_slice_log_message( self, _slice: Optional[Mapping[str, Any]]) -> airbyte_cdk.AirbyteMessage:
23    def create_slice_log_message(self, _slice: Optional[Mapping[str, Any]]) -> AirbyteMessage:
24        """
25        Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a `str(<object>)` if
26        the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump
27        """
28        printable_slice = dict(_slice) if _slice else _slice
29        return AirbyteMessage(
30            type=MessageType.LOG,
31            log=AirbyteLogMessage(
32                level=Level.INFO,
33                message=f"{SliceLogger.SLICE_LOG_PREFIX}{json.dumps(printable_slice, default=str)}",
34            ),
35        )

Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a str(<object>) if the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump

@abstractmethod
def should_log_slice_message(self, logger: logging.Logger) -> bool:
37    @abstractmethod
38    def should_log_slice_message(self, logger: logging.Logger) -> bool:
39        """
40
41        :param logger:
42        :return:
43        """
Parameters
  • logger:
Returns
class DebugSliceLogger(SliceLogger):
46class DebugSliceLogger(SliceLogger):
47    def should_log_slice_message(self, logger: logging.Logger) -> bool:
48        """
49
50        :param logger:
51        :return:
52        """
53        return logger.isEnabledFor(logging.DEBUG)

SliceLogger is an interface that allows us to log slices of data in a uniform way. It is responsible for determining whether or not a slice should be logged and for creating the log message.

def should_log_slice_message(self, logger: logging.Logger) -> bool:
47    def should_log_slice_message(self, logger: logging.Logger) -> bool:
48        """
49
50        :param logger:
51        :return:
52        """
53        return logger.isEnabledFor(logging.DEBUG)
Parameters
  • logger:
Returns
class AlwaysLogSliceLogger(SliceLogger):
56class AlwaysLogSliceLogger(SliceLogger):
57    def should_log_slice_message(self, logger: logging.Logger) -> bool:
58        return True

SliceLogger is an interface that allows us to log slices of data in a uniform way. It is responsible for determining whether or not a slice should be logged and for creating the log message.

def should_log_slice_message(self, logger: logging.Logger) -> bool:
57    def should_log_slice_message(self, logger: logging.Logger) -> bool:
58        return True
Parameters
  • logger:
Returns