airbyte.logs
PyAirbyte Logging features and related configuration.
By default, PyAirbyte main logs are written to a file in the AIRBYTE_LOGGING_ROOT
directory, which
defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log
files within the same directory, under a subfolder with the name of the connector.
PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured
logging in JSON, set AIRBYTE_STRUCTURED_LOGGING
to True
.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte Logging features and related configuration. 3 4By default, PyAirbyte main logs are written to a file in the `AIRBYTE_LOGGING_ROOT` directory, which 5defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log 6files within the same directory, under a subfolder with the name of the connector. 7 8PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured 9logging in JSON, set `AIRBYTE_STRUCTURED_LOGGING` to `True`. 10""" 11 12from __future__ import annotations 13 14import logging 15import os 16import platform 17import sys 18import tempfile 19import warnings 20from functools import lru_cache 21from pathlib import Path 22 23import structlog 24import ulid 25 26from airbyte_cdk.utils.datetime_helpers import ab_datetime_now 27 28 29def _str_to_bool(value: str) -> bool: 30 """Convert a string value of an environment values to a boolean value.""" 31 return bool(value) and value.lower() not in {"", "0", "false", "f", "no", "n", "off"} 32 33 34AIRBYTE_STRUCTURED_LOGGING: bool = _str_to_bool( 35 os.getenv( 36 key="AIRBYTE_STRUCTURED_LOGGING", 37 default="false", 38 ) 39) 40"""Whether to enable structured logging. 41 42This value is read from the `AIRBYTE_STRUCTURED_LOGGING` environment variable. If the variable is 43not set, the default value is `False`. 44""" 45 46_warned_messages: set[str] = set() 47 48 49def warn_once( 50 message: str, 51 logger: logging.Logger | None = None, 52 *, 53 with_stack: int | bool, 54) -> None: 55 """Emit a warning message only once. 56 57 This function is a wrapper around the `warnings.warn` function that logs the warning message 58 to the global logger. The warning message is only emitted once per unique message. 59 """ 60 if message in _warned_messages: 61 return 62 63 if not with_stack: 64 stacklevel = 0 65 66 if with_stack is True: 67 stacklevel = 2 68 69 _warned_messages.add(message) 70 warnings.warn( 71 message, 72 category=UserWarning, 73 stacklevel=stacklevel, 74 ) 75 76 if logger: 77 logger.warning(message) 78 79 80def _get_logging_root() -> Path | None: 81 """Return the root directory for logs. 82 83 Returns `None` if no valid path can be found. 84 85 This is the directory where logs are stored. 86 """ 87 if "AIRBYTE_LOGGING_ROOT" in os.environ: 88 log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"]) 89 elif platform.system() == "Darwin" or platform.system() == "Linux": 90 # Use /tmp on macOS and Linux 91 log_root = Path("/tmp") / "airbyte" / "logs" 92 else: 93 # Use the default temp directory on Windows or any other OS 94 log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs" 95 96 try: 97 # Attempt to create the log root directory if it does not exist 98 log_root.mkdir(parents=True, exist_ok=True) 99 except OSError: 100 # Handle the error by returning None 101 warn_once( 102 ( 103 f"Failed to create PyAirbyte logging directory at `{log_root}`. " 104 "You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` " 105 "environment variable." 106 ), 107 with_stack=False, 108 ) 109 return None 110 else: 111 return log_root 112 113 114AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root() 115"""The root directory for Airbyte logs. 116 117This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable. 118 119If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default 120temporary directory. If the directory cannot be created, PyAirbyte will log a warning and 121set this value to `None`. 122""" 123 124 125@lru_cache 126def get_global_file_logger() -> logging.Logger | None: 127 """Return the global logger for PyAirbyte. 128 129 This logger is configured to write logs to the console and to a file in the log directory. 130 """ 131 logger = logging.getLogger("airbyte") 132 logger.setLevel(logging.INFO) 133 logger.propagate = False 134 135 if AIRBYTE_LOGGING_ROOT is None: 136 # No temp directory available, so return None 137 return None 138 139 # Else, configure the logger to write to a file 140 141 # Remove any existing handlers 142 for handler in logger.handlers: 143 logger.removeHandler(handler) 144 145 yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d") 146 folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd 147 try: 148 folder.mkdir(parents=True, exist_ok=True) 149 except Exception: 150 warn_once( 151 f"Failed to create logging directory at '{folder!s}'.", 152 with_stack=False, 153 ) 154 return None 155 156 logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log" 157 print(f"Writing PyAirbyte logs to file: {logfile_path!s}", file=sys.stderr) 158 159 file_handler = logging.FileHandler( 160 filename=logfile_path, 161 encoding="utf-8", 162 ) 163 164 if AIRBYTE_STRUCTURED_LOGGING: 165 # Create a formatter and set it for the handler 166 formatter = logging.Formatter("%(message)s") 167 file_handler.setFormatter(formatter) 168 169 # Add the file handler to the logger 170 logger.addHandler(file_handler) 171 172 # Configure structlog 173 structlog.configure( 174 processors=[ 175 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 176 structlog.stdlib.add_log_level, 177 structlog.stdlib.PositionalArgumentsFormatter(), 178 structlog.processors.StackInfoRenderer(), 179 structlog.processors.format_exc_info, 180 structlog.processors.JSONRenderer(), 181 ], 182 context_class=dict, 183 logger_factory=structlog.stdlib.LoggerFactory(), 184 wrapper_class=structlog.stdlib.BoundLogger, 185 cache_logger_on_first_use=True, 186 ) 187 188 # Create a logger 189 return structlog.get_logger("airbyte") 190 191 # Create and configure file handler 192 file_handler.setFormatter( 193 logging.Formatter( 194 fmt="%(asctime)s - %(levelname)s - %(message)s", 195 datefmt="%Y-%m-%d %H:%M:%S", 196 ) 197 ) 198 199 logger.addHandler(file_handler) 200 return logger 201 202 203def get_global_stats_log_path() -> Path | None: 204 """Return the path to the performance log file.""" 205 if AIRBYTE_LOGGING_ROOT is None: 206 return None 207 208 folder = AIRBYTE_LOGGING_ROOT 209 try: 210 folder.mkdir(parents=True, exist_ok=True) 211 except Exception: 212 warn_once( 213 f"Failed to create logging directory at '{folder!s}'.", 214 with_stack=False, 215 ) 216 return None 217 218 return folder / "airbyte-stats.log" 219 220 221@lru_cache 222def get_global_stats_logger() -> structlog.BoundLogger: 223 """Create a stats logger for performance metrics.""" 224 logger = logging.getLogger("airbyte.stats") 225 logger.setLevel(logging.INFO) 226 logger.propagate = False 227 228 # Configure structlog 229 structlog.configure( 230 processors=[ 231 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 232 structlog.stdlib.PositionalArgumentsFormatter(), 233 structlog.processors.JSONRenderer(), 234 ], 235 context_class=dict, 236 logger_factory=structlog.stdlib.LoggerFactory(), 237 wrapper_class=structlog.stdlib.BoundLogger, 238 cache_logger_on_first_use=True, 239 ) 240 241 logfile_path: Path | None = get_global_stats_log_path() 242 if AIRBYTE_LOGGING_ROOT is None or logfile_path is None: 243 # No temp directory available, so return no-op logger without handlers 244 return structlog.get_logger("airbyte.stats") 245 246 print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}", file=sys.stderr) 247 248 # Remove any existing handlers 249 for handler in logger.handlers: 250 logger.removeHandler(handler) 251 252 folder = AIRBYTE_LOGGING_ROOT 253 try: 254 folder.mkdir(parents=True, exist_ok=True) 255 except Exception: 256 warn_once( 257 f"Failed to create logging directory at '{folder!s}'.", 258 with_stack=False, 259 ) 260 return structlog.get_logger("airbyte.stats") 261 262 file_handler = logging.FileHandler( 263 filename=logfile_path, 264 encoding="utf-8", 265 ) 266 267 # Create a formatter and set it for the handler 268 formatter = logging.Formatter("%(message)s") 269 file_handler.setFormatter(formatter) 270 271 # Add the file handler to the logger 272 logger.addHandler(file_handler) 273 274 # Create a logger 275 return structlog.get_logger("airbyte.stats") 276 277 278def new_passthrough_file_logger(connector_name: str) -> logging.Logger: 279 """Create a logger from logging module.""" 280 logger = logging.getLogger(f"airbyte.{connector_name}") 281 logger.setLevel(logging.INFO) 282 283 # Prevent logging to stderr by stopping propagation to the root logger 284 logger.propagate = False 285 286 if AIRBYTE_LOGGING_ROOT is None: 287 # No temp directory available, so return a basic logger 288 return logger 289 290 # Else, configure the logger to write to a file 291 292 # Remove any existing handlers 293 for handler in logger.handlers: 294 logger.removeHandler(handler) 295 296 folder = AIRBYTE_LOGGING_ROOT / connector_name 297 folder.mkdir(parents=True, exist_ok=True) 298 299 # Create a file handler 300 global_logger = get_global_file_logger() 301 logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log" 302 logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}" 303 print(logfile_msg, file=sys.stderr) 304 if global_logger: 305 global_logger.info(logfile_msg) 306 307 file_handler = logging.FileHandler(logfile_path) 308 file_handler.setLevel(logging.INFO) 309 310 if AIRBYTE_STRUCTURED_LOGGING: 311 # Create a formatter and set it for the handler 312 formatter = logging.Formatter("%(message)s") 313 file_handler.setFormatter(formatter) 314 315 # Add the file handler to the logger 316 logger.addHandler(file_handler) 317 318 # Configure structlog 319 structlog.configure( 320 processors=[ 321 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 322 structlog.stdlib.add_log_level, 323 structlog.stdlib.PositionalArgumentsFormatter(), 324 structlog.processors.StackInfoRenderer(), 325 structlog.processors.format_exc_info, 326 structlog.processors.JSONRenderer(), 327 ], 328 context_class=dict, 329 logger_factory=structlog.stdlib.LoggerFactory(), 330 wrapper_class=structlog.stdlib.BoundLogger, 331 cache_logger_on_first_use=True, 332 ) 333 334 # Create a logger 335 return structlog.get_logger(f"airbyte.{connector_name}") 336 337 # Else, write logs in plain text 338 339 file_handler.setFormatter( 340 logging.Formatter( 341 fmt="%(asctime)s - %(levelname)s - %(message)s", 342 datefmt="%Y-%m-%d %H:%M:%S", 343 ) 344 ) 345 346 logger.addHandler(file_handler) 347 return logger
Whether to enable structured logging.
This value is read from the AIRBYTE_STRUCTURED_LOGGING
environment variable. If the variable is
not set, the default value is False
.
50def warn_once( 51 message: str, 52 logger: logging.Logger | None = None, 53 *, 54 with_stack: int | bool, 55) -> None: 56 """Emit a warning message only once. 57 58 This function is a wrapper around the `warnings.warn` function that logs the warning message 59 to the global logger. The warning message is only emitted once per unique message. 60 """ 61 if message in _warned_messages: 62 return 63 64 if not with_stack: 65 stacklevel = 0 66 67 if with_stack is True: 68 stacklevel = 2 69 70 _warned_messages.add(message) 71 warnings.warn( 72 message, 73 category=UserWarning, 74 stacklevel=stacklevel, 75 ) 76 77 if logger: 78 logger.warning(message)
Emit a warning message only once.
This function is a wrapper around the warnings.warn
function that logs the warning message
to the global logger. The warning message is only emitted once per unique message.
The root directory for Airbyte logs.
This value can be overridden by setting the AIRBYTE_LOGGING_ROOT
environment variable.
If not provided, PyAirbyte will use /tmp/airbyte/logs/
where /tmp/
is the OS's default
temporary directory. If the directory cannot be created, PyAirbyte will log a warning and
set this value to None
.
126@lru_cache 127def get_global_file_logger() -> logging.Logger | None: 128 """Return the global logger for PyAirbyte. 129 130 This logger is configured to write logs to the console and to a file in the log directory. 131 """ 132 logger = logging.getLogger("airbyte") 133 logger.setLevel(logging.INFO) 134 logger.propagate = False 135 136 if AIRBYTE_LOGGING_ROOT is None: 137 # No temp directory available, so return None 138 return None 139 140 # Else, configure the logger to write to a file 141 142 # Remove any existing handlers 143 for handler in logger.handlers: 144 logger.removeHandler(handler) 145 146 yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d") 147 folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd 148 try: 149 folder.mkdir(parents=True, exist_ok=True) 150 except Exception: 151 warn_once( 152 f"Failed to create logging directory at '{folder!s}'.", 153 with_stack=False, 154 ) 155 return None 156 157 logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log" 158 print(f"Writing PyAirbyte logs to file: {logfile_path!s}", file=sys.stderr) 159 160 file_handler = logging.FileHandler( 161 filename=logfile_path, 162 encoding="utf-8", 163 ) 164 165 if AIRBYTE_STRUCTURED_LOGGING: 166 # Create a formatter and set it for the handler 167 formatter = logging.Formatter("%(message)s") 168 file_handler.setFormatter(formatter) 169 170 # Add the file handler to the logger 171 logger.addHandler(file_handler) 172 173 # Configure structlog 174 structlog.configure( 175 processors=[ 176 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 177 structlog.stdlib.add_log_level, 178 structlog.stdlib.PositionalArgumentsFormatter(), 179 structlog.processors.StackInfoRenderer(), 180 structlog.processors.format_exc_info, 181 structlog.processors.JSONRenderer(), 182 ], 183 context_class=dict, 184 logger_factory=structlog.stdlib.LoggerFactory(), 185 wrapper_class=structlog.stdlib.BoundLogger, 186 cache_logger_on_first_use=True, 187 ) 188 189 # Create a logger 190 return structlog.get_logger("airbyte") 191 192 # Create and configure file handler 193 file_handler.setFormatter( 194 logging.Formatter( 195 fmt="%(asctime)s - %(levelname)s - %(message)s", 196 datefmt="%Y-%m-%d %H:%M:%S", 197 ) 198 ) 199 200 logger.addHandler(file_handler) 201 return logger
Return the global logger for PyAirbyte.
This logger is configured to write logs to the console and to a file in the log directory.
204def get_global_stats_log_path() -> Path | None: 205 """Return the path to the performance log file.""" 206 if AIRBYTE_LOGGING_ROOT is None: 207 return None 208 209 folder = AIRBYTE_LOGGING_ROOT 210 try: 211 folder.mkdir(parents=True, exist_ok=True) 212 except Exception: 213 warn_once( 214 f"Failed to create logging directory at '{folder!s}'.", 215 with_stack=False, 216 ) 217 return None 218 219 return folder / "airbyte-stats.log"
Return the path to the performance log file.
222@lru_cache 223def get_global_stats_logger() -> structlog.BoundLogger: 224 """Create a stats logger for performance metrics.""" 225 logger = logging.getLogger("airbyte.stats") 226 logger.setLevel(logging.INFO) 227 logger.propagate = False 228 229 # Configure structlog 230 structlog.configure( 231 processors=[ 232 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 233 structlog.stdlib.PositionalArgumentsFormatter(), 234 structlog.processors.JSONRenderer(), 235 ], 236 context_class=dict, 237 logger_factory=structlog.stdlib.LoggerFactory(), 238 wrapper_class=structlog.stdlib.BoundLogger, 239 cache_logger_on_first_use=True, 240 ) 241 242 logfile_path: Path | None = get_global_stats_log_path() 243 if AIRBYTE_LOGGING_ROOT is None or logfile_path is None: 244 # No temp directory available, so return no-op logger without handlers 245 return structlog.get_logger("airbyte.stats") 246 247 print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}", file=sys.stderr) 248 249 # Remove any existing handlers 250 for handler in logger.handlers: 251 logger.removeHandler(handler) 252 253 folder = AIRBYTE_LOGGING_ROOT 254 try: 255 folder.mkdir(parents=True, exist_ok=True) 256 except Exception: 257 warn_once( 258 f"Failed to create logging directory at '{folder!s}'.", 259 with_stack=False, 260 ) 261 return structlog.get_logger("airbyte.stats") 262 263 file_handler = logging.FileHandler( 264 filename=logfile_path, 265 encoding="utf-8", 266 ) 267 268 # Create a formatter and set it for the handler 269 formatter = logging.Formatter("%(message)s") 270 file_handler.setFormatter(formatter) 271 272 # Add the file handler to the logger 273 logger.addHandler(file_handler) 274 275 # Create a logger 276 return structlog.get_logger("airbyte.stats")
Create a stats logger for performance metrics.
279def new_passthrough_file_logger(connector_name: str) -> logging.Logger: 280 """Create a logger from logging module.""" 281 logger = logging.getLogger(f"airbyte.{connector_name}") 282 logger.setLevel(logging.INFO) 283 284 # Prevent logging to stderr by stopping propagation to the root logger 285 logger.propagate = False 286 287 if AIRBYTE_LOGGING_ROOT is None: 288 # No temp directory available, so return a basic logger 289 return logger 290 291 # Else, configure the logger to write to a file 292 293 # Remove any existing handlers 294 for handler in logger.handlers: 295 logger.removeHandler(handler) 296 297 folder = AIRBYTE_LOGGING_ROOT / connector_name 298 folder.mkdir(parents=True, exist_ok=True) 299 300 # Create a file handler 301 global_logger = get_global_file_logger() 302 logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log" 303 logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}" 304 print(logfile_msg, file=sys.stderr) 305 if global_logger: 306 global_logger.info(logfile_msg) 307 308 file_handler = logging.FileHandler(logfile_path) 309 file_handler.setLevel(logging.INFO) 310 311 if AIRBYTE_STRUCTURED_LOGGING: 312 # Create a formatter and set it for the handler 313 formatter = logging.Formatter("%(message)s") 314 file_handler.setFormatter(formatter) 315 316 # Add the file handler to the logger 317 logger.addHandler(file_handler) 318 319 # Configure structlog 320 structlog.configure( 321 processors=[ 322 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 323 structlog.stdlib.add_log_level, 324 structlog.stdlib.PositionalArgumentsFormatter(), 325 structlog.processors.StackInfoRenderer(), 326 structlog.processors.format_exc_info, 327 structlog.processors.JSONRenderer(), 328 ], 329 context_class=dict, 330 logger_factory=structlog.stdlib.LoggerFactory(), 331 wrapper_class=structlog.stdlib.BoundLogger, 332 cache_logger_on_first_use=True, 333 ) 334 335 # Create a logger 336 return structlog.get_logger(f"airbyte.{connector_name}") 337 338 # Else, write logs in plain text 339 340 file_handler.setFormatter( 341 logging.Formatter( 342 fmt="%(asctime)s - %(levelname)s - %(message)s", 343 datefmt="%Y-%m-%d %H:%M:%S", 344 ) 345 ) 346 347 logger.addHandler(file_handler) 348 return logger
Create a logger from logging module.