airbyte.logs
PyAirbyte Logging features and related configuration.
By default, PyAirbyte main logs are written to a file in the AIRBYTE_LOGGING_ROOT
directory, which
defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log
files within the same directory, under a subfolder with the name of the connector.
PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured
logging in JSON, set AIRBYTE_STRUCTURED_LOGGING
to True
.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte Logging features and related configuration. 3 4By default, PyAirbyte main logs are written to a file in the `AIRBYTE_LOGGING_ROOT` directory, which 5defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log 6files within the same directory, under a subfolder with the name of the connector. 7 8PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured 9logging in JSON, set `AIRBYTE_STRUCTURED_LOGGING` to `True`. 10""" 11 12from __future__ import annotations 13 14import logging 15import os 16import platform 17import tempfile 18import warnings 19from functools import lru_cache 20from pathlib import Path 21 22import structlog 23import ulid 24 25from airbyte_cdk.utils.datetime_helpers import ab_datetime_now 26 27 28def _str_to_bool(value: str) -> bool: 29 """Convert a string value of an environment values to a boolean value.""" 30 return bool(value) and value.lower() not in {"", "0", "false", "f", "no", "n", "off"} 31 32 33AIRBYTE_STRUCTURED_LOGGING: bool = _str_to_bool( 34 os.getenv( 35 key="AIRBYTE_STRUCTURED_LOGGING", 36 default="false", 37 ) 38) 39"""Whether to enable structured logging. 40 41This value is read from the `AIRBYTE_STRUCTURED_LOGGING` environment variable. If the variable is 42not set, the default value is `False`. 43""" 44 45_warned_messages: set[str] = set() 46 47 48def warn_once( 49 message: str, 50 logger: logging.Logger | None = None, 51 *, 52 with_stack: int | bool, 53) -> None: 54 """Emit a warning message only once. 55 56 This function is a wrapper around the `warnings.warn` function that logs the warning message 57 to the global logger. The warning message is only emitted once per unique message. 58 """ 59 if message in _warned_messages: 60 return 61 62 if not with_stack: 63 stacklevel = 0 64 65 if with_stack is True: 66 stacklevel = 2 67 68 _warned_messages.add(message) 69 warnings.warn( 70 message, 71 category=UserWarning, 72 stacklevel=stacklevel, 73 ) 74 75 if logger: 76 logger.warning(message) 77 78 79def _get_logging_root() -> Path | None: 80 """Return the root directory for logs. 81 82 Returns `None` if no valid path can be found. 83 84 This is the directory where logs are stored. 85 """ 86 if "AIRBYTE_LOGGING_ROOT" in os.environ: 87 log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"]) 88 elif platform.system() == "Darwin" or platform.system() == "Linux": 89 # Use /tmp on macOS and Linux 90 log_root = Path("/tmp") / "airbyte" / "logs" 91 else: 92 # Use the default temp directory on Windows or any other OS 93 log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs" 94 95 try: 96 # Attempt to create the log root directory if it does not exist 97 log_root.mkdir(parents=True, exist_ok=True) 98 except OSError: 99 # Handle the error by returning None 100 warn_once( 101 ( 102 f"Failed to create PyAirbyte logging directory at `{log_root}`. " 103 "You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` " 104 "environment variable." 105 ), 106 with_stack=False, 107 ) 108 return None 109 else: 110 return log_root 111 112 113AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root() 114"""The root directory for Airbyte logs. 115 116This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable. 117 118If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default 119temporary directory. If the directory cannot be created, PyAirbyte will log a warning and 120set this value to `None`. 121""" 122 123 124@lru_cache 125def get_global_file_logger() -> logging.Logger | None: 126 """Return the global logger for PyAirbyte. 127 128 This logger is configured to write logs to the console and to a file in the log directory. 129 """ 130 logger = logging.getLogger("airbyte") 131 logger.setLevel(logging.INFO) 132 logger.propagate = False 133 134 if AIRBYTE_LOGGING_ROOT is None: 135 # No temp directory available, so return None 136 return None 137 138 # Else, configure the logger to write to a file 139 140 # Remove any existing handlers 141 for handler in logger.handlers: 142 logger.removeHandler(handler) 143 144 yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d") 145 folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd 146 try: 147 folder.mkdir(parents=True, exist_ok=True) 148 except Exception: 149 warn_once( 150 f"Failed to create logging directory at '{folder!s}'.", 151 with_stack=False, 152 ) 153 return None 154 155 logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log" 156 print(f"Writing PyAirbyte logs to file: {logfile_path!s}") 157 158 file_handler = logging.FileHandler( 159 filename=logfile_path, 160 encoding="utf-8", 161 ) 162 163 if AIRBYTE_STRUCTURED_LOGGING: 164 # Create a formatter and set it for the handler 165 formatter = logging.Formatter("%(message)s") 166 file_handler.setFormatter(formatter) 167 168 # Add the file handler to the logger 169 logger.addHandler(file_handler) 170 171 # Configure structlog 172 structlog.configure( 173 processors=[ 174 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 175 structlog.stdlib.add_log_level, 176 structlog.stdlib.PositionalArgumentsFormatter(), 177 structlog.processors.StackInfoRenderer(), 178 structlog.processors.format_exc_info, 179 structlog.processors.JSONRenderer(), 180 ], 181 context_class=dict, 182 logger_factory=structlog.stdlib.LoggerFactory(), 183 wrapper_class=structlog.stdlib.BoundLogger, 184 cache_logger_on_first_use=True, 185 ) 186 187 # Create a logger 188 return structlog.get_logger("airbyte") 189 190 # Create and configure file handler 191 file_handler.setFormatter( 192 logging.Formatter( 193 fmt="%(asctime)s - %(levelname)s - %(message)s", 194 datefmt="%Y-%m-%d %H:%M:%S", 195 ) 196 ) 197 198 logger.addHandler(file_handler) 199 return logger 200 201 202def get_global_stats_log_path() -> Path | None: 203 """Return the path to the performance log file.""" 204 if AIRBYTE_LOGGING_ROOT is None: 205 return None 206 207 folder = AIRBYTE_LOGGING_ROOT 208 try: 209 folder.mkdir(parents=True, exist_ok=True) 210 except Exception: 211 warn_once( 212 f"Failed to create logging directory at '{folder!s}'.", 213 with_stack=False, 214 ) 215 return None 216 217 return folder / "airbyte-stats.log" 218 219 220@lru_cache 221def get_global_stats_logger() -> structlog.BoundLogger: 222 """Create a stats logger for performance metrics.""" 223 logger = logging.getLogger("airbyte.stats") 224 logger.setLevel(logging.INFO) 225 logger.propagate = False 226 227 # Configure structlog 228 structlog.configure( 229 processors=[ 230 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 231 structlog.stdlib.PositionalArgumentsFormatter(), 232 structlog.processors.JSONRenderer(), 233 ], 234 context_class=dict, 235 logger_factory=structlog.stdlib.LoggerFactory(), 236 wrapper_class=structlog.stdlib.BoundLogger, 237 cache_logger_on_first_use=True, 238 ) 239 240 logfile_path: Path | None = get_global_stats_log_path() 241 if AIRBYTE_LOGGING_ROOT is None or logfile_path is None: 242 # No temp directory available, so return no-op logger without handlers 243 return structlog.get_logger("airbyte.stats") 244 245 print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}") 246 247 # Remove any existing handlers 248 for handler in logger.handlers: 249 logger.removeHandler(handler) 250 251 folder = AIRBYTE_LOGGING_ROOT 252 try: 253 folder.mkdir(parents=True, exist_ok=True) 254 except Exception: 255 warn_once( 256 f"Failed to create logging directory at '{folder!s}'.", 257 with_stack=False, 258 ) 259 return structlog.get_logger("airbyte.stats") 260 261 file_handler = logging.FileHandler( 262 filename=logfile_path, 263 encoding="utf-8", 264 ) 265 266 # Create a formatter and set it for the handler 267 formatter = logging.Formatter("%(message)s") 268 file_handler.setFormatter(formatter) 269 270 # Add the file handler to the logger 271 logger.addHandler(file_handler) 272 273 # Create a logger 274 return structlog.get_logger("airbyte.stats") 275 276 277def new_passthrough_file_logger(connector_name: str) -> logging.Logger: 278 """Create a logger from logging module.""" 279 logger = logging.getLogger(f"airbyte.{connector_name}") 280 logger.setLevel(logging.INFO) 281 282 # Prevent logging to stderr by stopping propagation to the root logger 283 logger.propagate = False 284 285 if AIRBYTE_LOGGING_ROOT is None: 286 # No temp directory available, so return a basic logger 287 return logger 288 289 # Else, configure the logger to write to a file 290 291 # Remove any existing handlers 292 for handler in logger.handlers: 293 logger.removeHandler(handler) 294 295 folder = AIRBYTE_LOGGING_ROOT / connector_name 296 folder.mkdir(parents=True, exist_ok=True) 297 298 # Create a file handler 299 global_logger = get_global_file_logger() 300 logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log" 301 logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}" 302 print(logfile_msg) 303 if global_logger: 304 global_logger.info(logfile_msg) 305 306 file_handler = logging.FileHandler(logfile_path) 307 file_handler.setLevel(logging.INFO) 308 309 if AIRBYTE_STRUCTURED_LOGGING: 310 # Create a formatter and set it for the handler 311 formatter = logging.Formatter("%(message)s") 312 file_handler.setFormatter(formatter) 313 314 # Add the file handler to the logger 315 logger.addHandler(file_handler) 316 317 # Configure structlog 318 structlog.configure( 319 processors=[ 320 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 321 structlog.stdlib.add_log_level, 322 structlog.stdlib.PositionalArgumentsFormatter(), 323 structlog.processors.StackInfoRenderer(), 324 structlog.processors.format_exc_info, 325 structlog.processors.JSONRenderer(), 326 ], 327 context_class=dict, 328 logger_factory=structlog.stdlib.LoggerFactory(), 329 wrapper_class=structlog.stdlib.BoundLogger, 330 cache_logger_on_first_use=True, 331 ) 332 333 # Create a logger 334 return structlog.get_logger(f"airbyte.{connector_name}") 335 336 # Else, write logs in plain text 337 338 file_handler.setFormatter( 339 logging.Formatter( 340 fmt="%(asctime)s - %(levelname)s - %(message)s", 341 datefmt="%Y-%m-%d %H:%M:%S", 342 ) 343 ) 344 345 logger.addHandler(file_handler) 346 return logger
Whether to enable structured logging.
This value is read from the AIRBYTE_STRUCTURED_LOGGING
environment variable. If the variable is
not set, the default value is False
.
49def warn_once( 50 message: str, 51 logger: logging.Logger | None = None, 52 *, 53 with_stack: int | bool, 54) -> None: 55 """Emit a warning message only once. 56 57 This function is a wrapper around the `warnings.warn` function that logs the warning message 58 to the global logger. The warning message is only emitted once per unique message. 59 """ 60 if message in _warned_messages: 61 return 62 63 if not with_stack: 64 stacklevel = 0 65 66 if with_stack is True: 67 stacklevel = 2 68 69 _warned_messages.add(message) 70 warnings.warn( 71 message, 72 category=UserWarning, 73 stacklevel=stacklevel, 74 ) 75 76 if logger: 77 logger.warning(message)
Emit a warning message only once.
This function is a wrapper around the warnings.warn
function that logs the warning message
to the global logger. The warning message is only emitted once per unique message.
The root directory for Airbyte logs.
This value can be overridden by setting the AIRBYTE_LOGGING_ROOT
environment variable.
If not provided, PyAirbyte will use /tmp/airbyte/logs/
where /tmp/
is the OS's default
temporary directory. If the directory cannot be created, PyAirbyte will log a warning and
set this value to None
.
125@lru_cache 126def get_global_file_logger() -> logging.Logger | None: 127 """Return the global logger for PyAirbyte. 128 129 This logger is configured to write logs to the console and to a file in the log directory. 130 """ 131 logger = logging.getLogger("airbyte") 132 logger.setLevel(logging.INFO) 133 logger.propagate = False 134 135 if AIRBYTE_LOGGING_ROOT is None: 136 # No temp directory available, so return None 137 return None 138 139 # Else, configure the logger to write to a file 140 141 # Remove any existing handlers 142 for handler in logger.handlers: 143 logger.removeHandler(handler) 144 145 yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d") 146 folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd 147 try: 148 folder.mkdir(parents=True, exist_ok=True) 149 except Exception: 150 warn_once( 151 f"Failed to create logging directory at '{folder!s}'.", 152 with_stack=False, 153 ) 154 return None 155 156 logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log" 157 print(f"Writing PyAirbyte logs to file: {logfile_path!s}") 158 159 file_handler = logging.FileHandler( 160 filename=logfile_path, 161 encoding="utf-8", 162 ) 163 164 if AIRBYTE_STRUCTURED_LOGGING: 165 # Create a formatter and set it for the handler 166 formatter = logging.Formatter("%(message)s") 167 file_handler.setFormatter(formatter) 168 169 # Add the file handler to the logger 170 logger.addHandler(file_handler) 171 172 # Configure structlog 173 structlog.configure( 174 processors=[ 175 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 176 structlog.stdlib.add_log_level, 177 structlog.stdlib.PositionalArgumentsFormatter(), 178 structlog.processors.StackInfoRenderer(), 179 structlog.processors.format_exc_info, 180 structlog.processors.JSONRenderer(), 181 ], 182 context_class=dict, 183 logger_factory=structlog.stdlib.LoggerFactory(), 184 wrapper_class=structlog.stdlib.BoundLogger, 185 cache_logger_on_first_use=True, 186 ) 187 188 # Create a logger 189 return structlog.get_logger("airbyte") 190 191 # Create and configure file handler 192 file_handler.setFormatter( 193 logging.Formatter( 194 fmt="%(asctime)s - %(levelname)s - %(message)s", 195 datefmt="%Y-%m-%d %H:%M:%S", 196 ) 197 ) 198 199 logger.addHandler(file_handler) 200 return logger
Return the global logger for PyAirbyte.
This logger is configured to write logs to the console and to a file in the log directory.
203def get_global_stats_log_path() -> Path | None: 204 """Return the path to the performance log file.""" 205 if AIRBYTE_LOGGING_ROOT is None: 206 return None 207 208 folder = AIRBYTE_LOGGING_ROOT 209 try: 210 folder.mkdir(parents=True, exist_ok=True) 211 except Exception: 212 warn_once( 213 f"Failed to create logging directory at '{folder!s}'.", 214 with_stack=False, 215 ) 216 return None 217 218 return folder / "airbyte-stats.log"
Return the path to the performance log file.
221@lru_cache 222def get_global_stats_logger() -> structlog.BoundLogger: 223 """Create a stats logger for performance metrics.""" 224 logger = logging.getLogger("airbyte.stats") 225 logger.setLevel(logging.INFO) 226 logger.propagate = False 227 228 # Configure structlog 229 structlog.configure( 230 processors=[ 231 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 232 structlog.stdlib.PositionalArgumentsFormatter(), 233 structlog.processors.JSONRenderer(), 234 ], 235 context_class=dict, 236 logger_factory=structlog.stdlib.LoggerFactory(), 237 wrapper_class=structlog.stdlib.BoundLogger, 238 cache_logger_on_first_use=True, 239 ) 240 241 logfile_path: Path | None = get_global_stats_log_path() 242 if AIRBYTE_LOGGING_ROOT is None or logfile_path is None: 243 # No temp directory available, so return no-op logger without handlers 244 return structlog.get_logger("airbyte.stats") 245 246 print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}") 247 248 # Remove any existing handlers 249 for handler in logger.handlers: 250 logger.removeHandler(handler) 251 252 folder = AIRBYTE_LOGGING_ROOT 253 try: 254 folder.mkdir(parents=True, exist_ok=True) 255 except Exception: 256 warn_once( 257 f"Failed to create logging directory at '{folder!s}'.", 258 with_stack=False, 259 ) 260 return structlog.get_logger("airbyte.stats") 261 262 file_handler = logging.FileHandler( 263 filename=logfile_path, 264 encoding="utf-8", 265 ) 266 267 # Create a formatter and set it for the handler 268 formatter = logging.Formatter("%(message)s") 269 file_handler.setFormatter(formatter) 270 271 # Add the file handler to the logger 272 logger.addHandler(file_handler) 273 274 # Create a logger 275 return structlog.get_logger("airbyte.stats")
Create a stats logger for performance metrics.
278def new_passthrough_file_logger(connector_name: str) -> logging.Logger: 279 """Create a logger from logging module.""" 280 logger = logging.getLogger(f"airbyte.{connector_name}") 281 logger.setLevel(logging.INFO) 282 283 # Prevent logging to stderr by stopping propagation to the root logger 284 logger.propagate = False 285 286 if AIRBYTE_LOGGING_ROOT is None: 287 # No temp directory available, so return a basic logger 288 return logger 289 290 # Else, configure the logger to write to a file 291 292 # Remove any existing handlers 293 for handler in logger.handlers: 294 logger.removeHandler(handler) 295 296 folder = AIRBYTE_LOGGING_ROOT / connector_name 297 folder.mkdir(parents=True, exist_ok=True) 298 299 # Create a file handler 300 global_logger = get_global_file_logger() 301 logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log" 302 logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}" 303 print(logfile_msg) 304 if global_logger: 305 global_logger.info(logfile_msg) 306 307 file_handler = logging.FileHandler(logfile_path) 308 file_handler.setLevel(logging.INFO) 309 310 if AIRBYTE_STRUCTURED_LOGGING: 311 # Create a formatter and set it for the handler 312 formatter = logging.Formatter("%(message)s") 313 file_handler.setFormatter(formatter) 314 315 # Add the file handler to the logger 316 logger.addHandler(file_handler) 317 318 # Configure structlog 319 structlog.configure( 320 processors=[ 321 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 322 structlog.stdlib.add_log_level, 323 structlog.stdlib.PositionalArgumentsFormatter(), 324 structlog.processors.StackInfoRenderer(), 325 structlog.processors.format_exc_info, 326 structlog.processors.JSONRenderer(), 327 ], 328 context_class=dict, 329 logger_factory=structlog.stdlib.LoggerFactory(), 330 wrapper_class=structlog.stdlib.BoundLogger, 331 cache_logger_on_first_use=True, 332 ) 333 334 # Create a logger 335 return structlog.get_logger(f"airbyte.{connector_name}") 336 337 # Else, write logs in plain text 338 339 file_handler.setFormatter( 340 logging.Formatter( 341 fmt="%(asctime)s - %(levelname)s - %(message)s", 342 datefmt="%Y-%m-%d %H:%M:%S", 343 ) 344 ) 345 346 logger.addHandler(file_handler) 347 return logger
Create a logger from logging module.