airbyte_cdk.sources.utils.slice_logger
1# 2# Copyright (c) 2023 Airbyte, Inc., all rights reserved. 3# 4 5import json 6import logging 7from abc import ABC, abstractmethod 8from typing import Any, Mapping, Optional 9 10from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level 11from airbyte_cdk.models import Type as MessageType 12 13 14# Once everything runs on the concurrent CDK and we've cleaned up the legacy flows, we should try to remove 15# this class and write messages directly to the message_repository instead of through the logger because for 16# cases like the connector builder where ordering of messages is important, using the logger can cause 17# messages to be grouped out of order. Alas work for a different day. 18class SliceLogger(ABC): 19 """ 20 SliceLogger is an interface that allows us to log slices of data in a uniform way. 21 It is responsible for determining whether or not a slice should be logged and for creating the log message. 22 """ 23 24 SLICE_LOG_PREFIX = "slice:" 25 26 def create_slice_log_message(self, _slice: Optional[Mapping[str, Any]]) -> AirbyteMessage: 27 """ 28 Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a `str(<object>)` if 29 the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump 30 """ 31 printable_slice = dict(_slice) if _slice else _slice 32 return AirbyteMessage( 33 type=MessageType.LOG, 34 log=AirbyteLogMessage( 35 level=Level.INFO, 36 message=f"{SliceLogger.SLICE_LOG_PREFIX}{json.dumps(printable_slice, default=str)}", 37 ), 38 ) 39 40 @abstractmethod 41 def should_log_slice_message(self, logger: logging.Logger) -> bool: 42 """ 43 44 :param logger: 45 :return: 46 """ 47 48 49class DebugSliceLogger(SliceLogger): 50 def should_log_slice_message(self, logger: logging.Logger) -> bool: 51 """ 52 53 :param logger: 54 :return: 55 """ 56 return logger.isEnabledFor(logging.DEBUG) 57 58 59class AlwaysLogSliceLogger(SliceLogger): 60 def should_log_slice_message(self, logger: logging.Logger) -> bool: 61 return True
19class SliceLogger(ABC): 20 """ 21 SliceLogger is an interface that allows us to log slices of data in a uniform way. 22 It is responsible for determining whether or not a slice should be logged and for creating the log message. 23 """ 24 25 SLICE_LOG_PREFIX = "slice:" 26 27 def create_slice_log_message(self, _slice: Optional[Mapping[str, Any]]) -> AirbyteMessage: 28 """ 29 Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a `str(<object>)` if 30 the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump 31 """ 32 printable_slice = dict(_slice) if _slice else _slice 33 return AirbyteMessage( 34 type=MessageType.LOG, 35 log=AirbyteLogMessage( 36 level=Level.INFO, 37 message=f"{SliceLogger.SLICE_LOG_PREFIX}{json.dumps(printable_slice, default=str)}", 38 ), 39 ) 40 41 @abstractmethod 42 def should_log_slice_message(self, logger: logging.Logger) -> bool: 43 """ 44 45 :param logger: 46 :return: 47 """
SliceLogger is an interface that allows us to log slices of data in a uniform way. It is responsible for determining whether or not a slice should be logged and for creating the log message.
27 def create_slice_log_message(self, _slice: Optional[Mapping[str, Any]]) -> AirbyteMessage: 28 """ 29 Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a `str(<object>)` if 30 the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump 31 """ 32 printable_slice = dict(_slice) if _slice else _slice 33 return AirbyteMessage( 34 type=MessageType.LOG, 35 log=AirbyteLogMessage( 36 level=Level.INFO, 37 message=f"{SliceLogger.SLICE_LOG_PREFIX}{json.dumps(printable_slice, default=str)}", 38 ), 39 )
Mapping is an interface that can be implemented in various ways. However, json.dumps will just do a str(<object>)
if
the slice is a class implementing Mapping. Therefore, we want to cast this as a dict before passing this to json.dump
50class DebugSliceLogger(SliceLogger): 51 def should_log_slice_message(self, logger: logging.Logger) -> bool: 52 """ 53 54 :param logger: 55 :return: 56 """ 57 return logger.isEnabledFor(logging.DEBUG)
SliceLogger is an interface that allows us to log slices of data in a uniform way. It is responsible for determining whether or not a slice should be logged and for creating the log message.
51 def should_log_slice_message(self, logger: logging.Logger) -> bool: 52 """ 53 54 :param logger: 55 :return: 56 """ 57 return logger.isEnabledFor(logging.DEBUG)
Parameters
- logger:
Returns
Inherited Members
60class AlwaysLogSliceLogger(SliceLogger): 61 def should_log_slice_message(self, logger: logging.Logger) -> bool: 62 return True
SliceLogger is an interface that allows us to log slices of data in a uniform way. It is responsible for determining whether or not a slice should be logged and for creating the log message.