airbyte.logs
PyAirbyte Logging features and related configuration.
By default, PyAirbyte main logs are written to a file in the AIRBYTE_LOGGING_ROOT
directory, which
defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log
files within the same directory, under a subfolder with the name of the connector.
PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured
logging in JSON, set AIRBYTE_STRUCTURED_LOGGING
to True
.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte Logging features and related configuration. 3 4By default, PyAirbyte main logs are written to a file in the `AIRBYTE_LOGGING_ROOT` directory, which 5defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log 6files within the same directory, under a subfolder with the name of the connector. 7 8PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured 9logging in JSON, set `AIRBYTE_STRUCTURED_LOGGING` to `True`. 10""" 11 12from __future__ import annotations 13 14import logging 15import os 16import platform 17import tempfile 18import warnings 19from functools import lru_cache 20from pathlib import Path 21 22import pendulum 23import structlog 24import ulid 25 26 27def _str_to_bool(value: str) -> bool: 28 """Convert a string value of an environment values to a boolean value.""" 29 return bool(value) and value.lower() not in {"", "0", "false", "f", "no", "n", "off"} 30 31 32AIRBYTE_STRUCTURED_LOGGING: bool = _str_to_bool( 33 os.getenv( 34 key="AIRBYTE_STRUCTURED_LOGGING", 35 default="false", 36 ) 37) 38"""Whether to enable structured logging. 39 40This value is read from the `AIRBYTE_STRUCTURED_LOGGING` environment variable. If the variable is 41not set, the default value is `False`. 42""" 43 44_warned_messages: set[str] = set() 45 46 47def warn_once( 48 message: str, 49 logger: logging.Logger | None = None, 50 *, 51 with_stack: int | bool, 52) -> None: 53 """Emit a warning message only once. 54 55 This function is a wrapper around the `warnings.warn` function that logs the warning message 56 to the global logger. The warning message is only emitted once per unique message. 57 """ 58 if message in _warned_messages: 59 return 60 61 if not with_stack: 62 stacklevel = 0 63 if with_stack is True: 64 stacklevel = 2 65 66 _warned_messages.add(message) 67 warnings.warn( 68 message, 69 category=UserWarning, 70 stacklevel=stacklevel, 71 ) 72 73 if logger: 74 logger.warning(message) 75 76 77def _get_logging_root() -> Path | None: 78 """Return the root directory for logs. 79 80 Returns `None` if no valid path can be found. 81 82 This is the directory where logs are stored. 83 """ 84 if "AIRBYTE_LOGGING_ROOT" in os.environ: 85 log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"]) 86 elif platform.system() == "Darwin" or platform.system() == "Linux": 87 # Use /tmp on macOS and Linux 88 log_root = Path("/tmp") / "airbyte" / "logs" 89 else: 90 # Use the default temp directory on Windows or any other OS 91 log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs" 92 93 try: 94 # Attempt to create the log root directory if it does not exist 95 log_root.mkdir(parents=True, exist_ok=True) 96 except OSError: 97 # Handle the error by returning None 98 warn_once( 99 ( 100 f"Failed to create PyAirbyte logging directory at `{log_root}`. " 101 "You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` " 102 "environment variable." 103 ), 104 with_stack=False, 105 ) 106 return None 107 else: 108 return log_root 109 110 111AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root() 112"""The root directory for Airbyte logs. 113 114This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable. 115 116If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default 117temporary directory. If the directory cannot be created, PyAirbyte will log a warning and 118set this value to `None`. 119""" 120 121 122@lru_cache 123def get_global_file_logger() -> logging.Logger | None: 124 """Return the global logger for PyAirbyte. 125 126 This logger is configured to write logs to the console and to a file in the log directory. 127 """ 128 logger = logging.getLogger("airbyte") 129 logger.setLevel(logging.INFO) 130 logger.propagate = False 131 132 if AIRBYTE_LOGGING_ROOT is None: 133 # No temp directory available, so return None 134 return None 135 136 # Else, configure the logger to write to a file 137 138 # Remove any existing handlers 139 for handler in logger.handlers: 140 logger.removeHandler(handler) 141 142 yyyy_mm_dd: str = pendulum.now().format("YYYY-MM-DD") 143 folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd 144 try: 145 folder.mkdir(parents=True, exist_ok=True) 146 except Exception: 147 warn_once( 148 f"Failed to create logging directory at '{folder!s}'.", 149 with_stack=False, 150 ) 151 return None 152 153 logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log" 154 print(f"Writing PyAirbyte logs to file: {logfile_path!s}") 155 156 file_handler = logging.FileHandler( 157 filename=logfile_path, 158 encoding="utf-8", 159 ) 160 161 if AIRBYTE_STRUCTURED_LOGGING: 162 # Create a formatter and set it for the handler 163 formatter = logging.Formatter("%(message)s") 164 file_handler.setFormatter(formatter) 165 166 # Add the file handler to the logger 167 logger.addHandler(file_handler) 168 169 # Configure structlog 170 structlog.configure( 171 processors=[ 172 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 173 structlog.stdlib.add_log_level, 174 structlog.stdlib.PositionalArgumentsFormatter(), 175 structlog.processors.StackInfoRenderer(), 176 structlog.processors.format_exc_info, 177 structlog.processors.JSONRenderer(), 178 ], 179 context_class=dict, 180 logger_factory=structlog.stdlib.LoggerFactory(), 181 wrapper_class=structlog.stdlib.BoundLogger, 182 cache_logger_on_first_use=True, 183 ) 184 185 # Create a logger 186 return structlog.get_logger("airbyte") 187 188 # Create and configure file handler 189 file_handler.setFormatter( 190 logging.Formatter( 191 fmt="%(asctime)s - %(levelname)s - %(message)s", 192 datefmt="%Y-%m-%d %H:%M:%S", 193 ) 194 ) 195 196 logger.addHandler(file_handler) 197 return logger 198 199 200def get_global_stats_log_path() -> Path | None: 201 """Return the path to the performance log file.""" 202 if AIRBYTE_LOGGING_ROOT is None: 203 return None 204 205 folder = AIRBYTE_LOGGING_ROOT 206 try: 207 folder.mkdir(parents=True, exist_ok=True) 208 except Exception: 209 warn_once( 210 f"Failed to create logging directory at '{folder!s}'.", 211 with_stack=False, 212 ) 213 return None 214 215 return folder / "airbyte-stats.log" 216 217 218@lru_cache 219def get_global_stats_logger() -> structlog.BoundLogger: 220 """Create a stats logger for performance metrics.""" 221 logger = logging.getLogger("airbyte.stats") 222 logger.setLevel(logging.INFO) 223 logger.propagate = False 224 225 # Configure structlog 226 structlog.configure( 227 processors=[ 228 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 229 structlog.stdlib.PositionalArgumentsFormatter(), 230 structlog.processors.JSONRenderer(), 231 ], 232 context_class=dict, 233 logger_factory=structlog.stdlib.LoggerFactory(), 234 wrapper_class=structlog.stdlib.BoundLogger, 235 cache_logger_on_first_use=True, 236 ) 237 238 logfile_path: Path | None = get_global_stats_log_path() 239 if AIRBYTE_LOGGING_ROOT is None or logfile_path is None: 240 # No temp directory available, so return no-op logger without handlers 241 return structlog.get_logger("airbyte.stats") 242 243 print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}") 244 245 # Remove any existing handlers 246 for handler in logger.handlers: 247 logger.removeHandler(handler) 248 249 folder = AIRBYTE_LOGGING_ROOT 250 try: 251 folder.mkdir(parents=True, exist_ok=True) 252 except Exception: 253 warn_once( 254 f"Failed to create logging directory at '{folder!s}'.", 255 with_stack=False, 256 ) 257 return structlog.get_logger("airbyte.stats") 258 259 file_handler = logging.FileHandler( 260 filename=logfile_path, 261 encoding="utf-8", 262 ) 263 264 # Create a formatter and set it for the handler 265 formatter = logging.Formatter("%(message)s") 266 file_handler.setFormatter(formatter) 267 268 # Add the file handler to the logger 269 logger.addHandler(file_handler) 270 271 # Create a logger 272 return structlog.get_logger("airbyte.stats") 273 274 275def new_passthrough_file_logger(connector_name: str) -> logging.Logger: 276 """Create a logger from logging module.""" 277 logger = logging.getLogger(f"airbyte.{connector_name}") 278 logger.setLevel(logging.INFO) 279 280 # Prevent logging to stderr by stopping propagation to the root logger 281 logger.propagate = False 282 283 if AIRBYTE_LOGGING_ROOT is None: 284 # No temp directory available, so return a basic logger 285 return logger 286 287 # Else, configure the logger to write to a file 288 289 # Remove any existing handlers 290 for handler in logger.handlers: 291 logger.removeHandler(handler) 292 293 folder = AIRBYTE_LOGGING_ROOT / connector_name 294 folder.mkdir(parents=True, exist_ok=True) 295 296 # Create a file handler 297 global_logger = get_global_file_logger() 298 logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log" 299 logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}" 300 print(logfile_msg) 301 if global_logger: 302 global_logger.info(logfile_msg) 303 304 file_handler = logging.FileHandler(logfile_path) 305 file_handler.setLevel(logging.INFO) 306 307 if AIRBYTE_STRUCTURED_LOGGING: 308 # Create a formatter and set it for the handler 309 formatter = logging.Formatter("%(message)s") 310 file_handler.setFormatter(formatter) 311 312 # Add the file handler to the logger 313 logger.addHandler(file_handler) 314 315 # Configure structlog 316 structlog.configure( 317 processors=[ 318 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 319 structlog.stdlib.add_log_level, 320 structlog.stdlib.PositionalArgumentsFormatter(), 321 structlog.processors.StackInfoRenderer(), 322 structlog.processors.format_exc_info, 323 structlog.processors.JSONRenderer(), 324 ], 325 context_class=dict, 326 logger_factory=structlog.stdlib.LoggerFactory(), 327 wrapper_class=structlog.stdlib.BoundLogger, 328 cache_logger_on_first_use=True, 329 ) 330 331 # Create a logger 332 return structlog.get_logger(f"airbyte.{connector_name}") 333 334 # Else, write logs in plain text 335 336 file_handler.setFormatter( 337 logging.Formatter( 338 fmt="%(asctime)s - %(levelname)s - %(message)s", 339 datefmt="%Y-%m-%d %H:%M:%S", 340 ) 341 ) 342 343 logger.addHandler(file_handler) 344 return logger
Whether to enable structured logging.
This value is read from the AIRBYTE_STRUCTURED_LOGGING
environment variable. If the variable is
not set, the default value is False
.
48def warn_once( 49 message: str, 50 logger: logging.Logger | None = None, 51 *, 52 with_stack: int | bool, 53) -> None: 54 """Emit a warning message only once. 55 56 This function is a wrapper around the `warnings.warn` function that logs the warning message 57 to the global logger. The warning message is only emitted once per unique message. 58 """ 59 if message in _warned_messages: 60 return 61 62 if not with_stack: 63 stacklevel = 0 64 if with_stack is True: 65 stacklevel = 2 66 67 _warned_messages.add(message) 68 warnings.warn( 69 message, 70 category=UserWarning, 71 stacklevel=stacklevel, 72 ) 73 74 if logger: 75 logger.warning(message)
Emit a warning message only once.
This function is a wrapper around the warnings.warn
function that logs the warning message
to the global logger. The warning message is only emitted once per unique message.
The root directory for Airbyte logs.
This value can be overridden by setting the AIRBYTE_LOGGING_ROOT
environment variable.
If not provided, PyAirbyte will use /tmp/airbyte/logs/
where /tmp/
is the OS's default
temporary directory. If the directory cannot be created, PyAirbyte will log a warning and
set this value to None
.
123@lru_cache 124def get_global_file_logger() -> logging.Logger | None: 125 """Return the global logger for PyAirbyte. 126 127 This logger is configured to write logs to the console and to a file in the log directory. 128 """ 129 logger = logging.getLogger("airbyte") 130 logger.setLevel(logging.INFO) 131 logger.propagate = False 132 133 if AIRBYTE_LOGGING_ROOT is None: 134 # No temp directory available, so return None 135 return None 136 137 # Else, configure the logger to write to a file 138 139 # Remove any existing handlers 140 for handler in logger.handlers: 141 logger.removeHandler(handler) 142 143 yyyy_mm_dd: str = pendulum.now().format("YYYY-MM-DD") 144 folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd 145 try: 146 folder.mkdir(parents=True, exist_ok=True) 147 except Exception: 148 warn_once( 149 f"Failed to create logging directory at '{folder!s}'.", 150 with_stack=False, 151 ) 152 return None 153 154 logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log" 155 print(f"Writing PyAirbyte logs to file: {logfile_path!s}") 156 157 file_handler = logging.FileHandler( 158 filename=logfile_path, 159 encoding="utf-8", 160 ) 161 162 if AIRBYTE_STRUCTURED_LOGGING: 163 # Create a formatter and set it for the handler 164 formatter = logging.Formatter("%(message)s") 165 file_handler.setFormatter(formatter) 166 167 # Add the file handler to the logger 168 logger.addHandler(file_handler) 169 170 # Configure structlog 171 structlog.configure( 172 processors=[ 173 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 174 structlog.stdlib.add_log_level, 175 structlog.stdlib.PositionalArgumentsFormatter(), 176 structlog.processors.StackInfoRenderer(), 177 structlog.processors.format_exc_info, 178 structlog.processors.JSONRenderer(), 179 ], 180 context_class=dict, 181 logger_factory=structlog.stdlib.LoggerFactory(), 182 wrapper_class=structlog.stdlib.BoundLogger, 183 cache_logger_on_first_use=True, 184 ) 185 186 # Create a logger 187 return structlog.get_logger("airbyte") 188 189 # Create and configure file handler 190 file_handler.setFormatter( 191 logging.Formatter( 192 fmt="%(asctime)s - %(levelname)s - %(message)s", 193 datefmt="%Y-%m-%d %H:%M:%S", 194 ) 195 ) 196 197 logger.addHandler(file_handler) 198 return logger
Return the global logger for PyAirbyte.
This logger is configured to write logs to the console and to a file in the log directory.
201def get_global_stats_log_path() -> Path | None: 202 """Return the path to the performance log file.""" 203 if AIRBYTE_LOGGING_ROOT is None: 204 return None 205 206 folder = AIRBYTE_LOGGING_ROOT 207 try: 208 folder.mkdir(parents=True, exist_ok=True) 209 except Exception: 210 warn_once( 211 f"Failed to create logging directory at '{folder!s}'.", 212 with_stack=False, 213 ) 214 return None 215 216 return folder / "airbyte-stats.log"
Return the path to the performance log file.
219@lru_cache 220def get_global_stats_logger() -> structlog.BoundLogger: 221 """Create a stats logger for performance metrics.""" 222 logger = logging.getLogger("airbyte.stats") 223 logger.setLevel(logging.INFO) 224 logger.propagate = False 225 226 # Configure structlog 227 structlog.configure( 228 processors=[ 229 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 230 structlog.stdlib.PositionalArgumentsFormatter(), 231 structlog.processors.JSONRenderer(), 232 ], 233 context_class=dict, 234 logger_factory=structlog.stdlib.LoggerFactory(), 235 wrapper_class=structlog.stdlib.BoundLogger, 236 cache_logger_on_first_use=True, 237 ) 238 239 logfile_path: Path | None = get_global_stats_log_path() 240 if AIRBYTE_LOGGING_ROOT is None or logfile_path is None: 241 # No temp directory available, so return no-op logger without handlers 242 return structlog.get_logger("airbyte.stats") 243 244 print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}") 245 246 # Remove any existing handlers 247 for handler in logger.handlers: 248 logger.removeHandler(handler) 249 250 folder = AIRBYTE_LOGGING_ROOT 251 try: 252 folder.mkdir(parents=True, exist_ok=True) 253 except Exception: 254 warn_once( 255 f"Failed to create logging directory at '{folder!s}'.", 256 with_stack=False, 257 ) 258 return structlog.get_logger("airbyte.stats") 259 260 file_handler = logging.FileHandler( 261 filename=logfile_path, 262 encoding="utf-8", 263 ) 264 265 # Create a formatter and set it for the handler 266 formatter = logging.Formatter("%(message)s") 267 file_handler.setFormatter(formatter) 268 269 # Add the file handler to the logger 270 logger.addHandler(file_handler) 271 272 # Create a logger 273 return structlog.get_logger("airbyte.stats")
Create a stats logger for performance metrics.
276def new_passthrough_file_logger(connector_name: str) -> logging.Logger: 277 """Create a logger from logging module.""" 278 logger = logging.getLogger(f"airbyte.{connector_name}") 279 logger.setLevel(logging.INFO) 280 281 # Prevent logging to stderr by stopping propagation to the root logger 282 logger.propagate = False 283 284 if AIRBYTE_LOGGING_ROOT is None: 285 # No temp directory available, so return a basic logger 286 return logger 287 288 # Else, configure the logger to write to a file 289 290 # Remove any existing handlers 291 for handler in logger.handlers: 292 logger.removeHandler(handler) 293 294 folder = AIRBYTE_LOGGING_ROOT / connector_name 295 folder.mkdir(parents=True, exist_ok=True) 296 297 # Create a file handler 298 global_logger = get_global_file_logger() 299 logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log" 300 logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}" 301 print(logfile_msg) 302 if global_logger: 303 global_logger.info(logfile_msg) 304 305 file_handler = logging.FileHandler(logfile_path) 306 file_handler.setLevel(logging.INFO) 307 308 if AIRBYTE_STRUCTURED_LOGGING: 309 # Create a formatter and set it for the handler 310 formatter = logging.Formatter("%(message)s") 311 file_handler.setFormatter(formatter) 312 313 # Add the file handler to the logger 314 logger.addHandler(file_handler) 315 316 # Configure structlog 317 structlog.configure( 318 processors=[ 319 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 320 structlog.stdlib.add_log_level, 321 structlog.stdlib.PositionalArgumentsFormatter(), 322 structlog.processors.StackInfoRenderer(), 323 structlog.processors.format_exc_info, 324 structlog.processors.JSONRenderer(), 325 ], 326 context_class=dict, 327 logger_factory=structlog.stdlib.LoggerFactory(), 328 wrapper_class=structlog.stdlib.BoundLogger, 329 cache_logger_on_first_use=True, 330 ) 331 332 # Create a logger 333 return structlog.get_logger(f"airbyte.{connector_name}") 334 335 # Else, write logs in plain text 336 337 file_handler.setFormatter( 338 logging.Formatter( 339 fmt="%(asctime)s - %(levelname)s - %(message)s", 340 datefmt="%Y-%m-%d %H:%M:%S", 341 ) 342 ) 343 344 logger.addHandler(file_handler) 345 return logger
Create a logger from logging module.