airbyte.logs
PyAirbyte Logging features and related configuration.
By default, PyAirbyte main logs are written to a file in the AIRBYTE_LOGGING_ROOT directory, which
defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log
files within the same directory, under a subfolder with the name of the connector.
PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured
logging in JSON, set AIRBYTE_STRUCTURED_LOGGING to True.
1# Copyright (c) 2024 Airbyte, Inc., all rights reserved. 2"""PyAirbyte Logging features and related configuration. 3 4By default, PyAirbyte main logs are written to a file in the `AIRBYTE_LOGGING_ROOT` directory, which 5defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log 6files within the same directory, under a subfolder with the name of the connector. 7 8PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured 9logging in JSON, set `AIRBYTE_STRUCTURED_LOGGING` to `True`. 10""" 11 12from __future__ import annotations 13 14import logging 15import os 16import platform 17import sys 18import tempfile 19import warnings 20from functools import lru_cache 21from pathlib import Path 22 23import structlog 24import ulid 25 26from airbyte_cdk.utils.datetime_helpers import ab_datetime_now 27 28 29def _str_to_bool(value: str) -> bool: 30 """Convert a string value of an environment values to a boolean value.""" 31 return bool(value) and value.lower() not in {"", "0", "false", "f", "no", "n", "off"} 32 33 34AIRBYTE_STRUCTURED_LOGGING: bool = _str_to_bool( 35 os.getenv( 36 key="AIRBYTE_STRUCTURED_LOGGING", 37 default="false", 38 ) 39) 40"""Whether to enable structured logging. 41 42This value is read from the `AIRBYTE_STRUCTURED_LOGGING` environment variable. If the variable is 43not set, the default value is `False`. 44""" 45 46_warned_messages: set[str] = set() 47 48 49def warn_once( 50 message: str, 51 logger: logging.Logger | None = None, 52 *, 53 with_stack: int | bool, 54) -> None: 55 """Emit a warning message only once. 56 57 This function is a wrapper around the `warnings.warn` function that logs the warning message 58 to the global logger. The warning message is only emitted once per unique message. 59 """ 60 if message in _warned_messages: 61 return 62 63 if not with_stack: 64 stacklevel = 0 65 elif with_stack is True: 66 stacklevel = 2 67 elif isinstance(with_stack, int): 68 stacklevel = with_stack 69 else: 70 stacklevel = 0 71 72 _warned_messages.add(message) 73 warnings.warn( 74 message, 75 category=UserWarning, 76 stacklevel=stacklevel, 77 ) 78 79 if logger: 80 logger.warning(message) 81 82 83def _get_logging_root() -> Path | None: 84 """Return the root directory for logs. 85 86 Returns `None` if no valid path can be found. 87 88 This is the directory where logs are stored. 89 """ 90 if "AIRBYTE_LOGGING_ROOT" in os.environ: 91 log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"]) 92 elif platform.system() == "Darwin" or platform.system() == "Linux": 93 # Use /tmp on macOS and Linux 94 log_root = Path("/tmp") / "airbyte" / "logs" 95 else: 96 # Use the default temp directory on Windows or any other OS 97 log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs" 98 99 try: 100 # Attempt to create the log root directory if it does not exist 101 log_root.mkdir(parents=True, exist_ok=True) 102 except OSError: 103 # Handle the error by returning None 104 warn_once( 105 ( 106 f"Failed to create PyAirbyte logging directory at `{log_root}`. " 107 "You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` " 108 "environment variable." 109 ), 110 with_stack=False, 111 ) 112 return None 113 else: 114 return log_root 115 116 117AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root() 118"""The root directory for Airbyte logs. 119 120This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable. 121 122If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default 123temporary directory. If the directory cannot be created, PyAirbyte will log a warning and 124set this value to `None`. 125""" 126 127 128@lru_cache 129def get_global_file_logger() -> logging.Logger | None: 130 """Return the global logger for PyAirbyte. 131 132 This logger is configured to write logs to the console and to a file in the log directory. 133 """ 134 logger = logging.getLogger("airbyte") 135 logger.setLevel(logging.INFO) 136 logger.propagate = False 137 138 if AIRBYTE_LOGGING_ROOT is None: 139 # No temp directory available, so return None 140 return None 141 142 # Else, configure the logger to write to a file 143 144 # Remove any existing handlers 145 for handler in logger.handlers: 146 logger.removeHandler(handler) 147 148 yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d") 149 folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd 150 try: 151 folder.mkdir(parents=True, exist_ok=True) 152 except Exception: 153 warn_once( 154 f"Failed to create logging directory at '{folder!s}'.", 155 with_stack=False, 156 ) 157 return None 158 159 logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log" 160 print(f"Writing PyAirbyte logs to file: {logfile_path!s}", file=sys.stderr) 161 162 file_handler = logging.FileHandler( 163 filename=logfile_path, 164 encoding="utf-8", 165 ) 166 167 if AIRBYTE_STRUCTURED_LOGGING: 168 # Create a formatter and set it for the handler 169 formatter = logging.Formatter("%(message)s") 170 file_handler.setFormatter(formatter) 171 172 # Add the file handler to the logger 173 logger.addHandler(file_handler) 174 175 # Configure structlog 176 structlog.configure( 177 processors=[ 178 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 179 structlog.stdlib.add_log_level, 180 structlog.stdlib.PositionalArgumentsFormatter(), 181 structlog.processors.StackInfoRenderer(), 182 structlog.processors.format_exc_info, 183 structlog.processors.JSONRenderer(), 184 ], 185 context_class=dict, 186 logger_factory=structlog.stdlib.LoggerFactory(), 187 wrapper_class=structlog.stdlib.BoundLogger, 188 cache_logger_on_first_use=True, 189 ) 190 191 # Create a logger 192 return structlog.get_logger("airbyte") 193 194 # Create and configure file handler 195 file_handler.setFormatter( 196 logging.Formatter( 197 fmt="%(asctime)s - %(levelname)s - %(message)s", 198 datefmt="%Y-%m-%d %H:%M:%S", 199 ) 200 ) 201 202 logger.addHandler(file_handler) 203 return logger 204 205 206def get_global_stats_log_path() -> Path | None: 207 """Return the path to the performance log file.""" 208 if AIRBYTE_LOGGING_ROOT is None: 209 return None 210 211 folder = AIRBYTE_LOGGING_ROOT 212 try: 213 folder.mkdir(parents=True, exist_ok=True) 214 except Exception: 215 warn_once( 216 f"Failed to create logging directory at '{folder!s}'.", 217 with_stack=False, 218 ) 219 return None 220 221 return folder / "airbyte-stats.log" 222 223 224@lru_cache 225def get_global_stats_logger() -> structlog.BoundLogger: 226 """Create a stats logger for performance metrics.""" 227 logger = logging.getLogger("airbyte.stats") 228 logger.setLevel(logging.INFO) 229 logger.propagate = False 230 231 # Configure structlog 232 structlog.configure( 233 processors=[ 234 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 235 structlog.stdlib.PositionalArgumentsFormatter(), 236 structlog.processors.JSONRenderer(), 237 ], 238 context_class=dict, 239 logger_factory=structlog.stdlib.LoggerFactory(), 240 wrapper_class=structlog.stdlib.BoundLogger, 241 cache_logger_on_first_use=True, 242 ) 243 244 logfile_path: Path | None = get_global_stats_log_path() 245 if AIRBYTE_LOGGING_ROOT is None or logfile_path is None: 246 # No temp directory available, so return no-op logger without handlers 247 return structlog.get_logger("airbyte.stats") 248 249 print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}", file=sys.stderr) 250 251 # Remove any existing handlers 252 for handler in logger.handlers: 253 logger.removeHandler(handler) 254 255 folder = AIRBYTE_LOGGING_ROOT 256 try: 257 folder.mkdir(parents=True, exist_ok=True) 258 except Exception: 259 warn_once( 260 f"Failed to create logging directory at '{folder!s}'.", 261 with_stack=False, 262 ) 263 return structlog.get_logger("airbyte.stats") 264 265 file_handler = logging.FileHandler( 266 filename=logfile_path, 267 encoding="utf-8", 268 ) 269 270 # Create a formatter and set it for the handler 271 formatter = logging.Formatter("%(message)s") 272 file_handler.setFormatter(formatter) 273 274 # Add the file handler to the logger 275 logger.addHandler(file_handler) 276 277 # Create a logger 278 return structlog.get_logger("airbyte.stats") 279 280 281def new_passthrough_file_logger(connector_name: str) -> logging.Logger: 282 """Create a logger from logging module.""" 283 logger = logging.getLogger(f"airbyte.{connector_name}") 284 logger.setLevel(logging.INFO) 285 286 # Prevent logging to stderr by stopping propagation to the root logger 287 logger.propagate = False 288 289 if AIRBYTE_LOGGING_ROOT is None: 290 # No temp directory available, so return a basic logger 291 return logger 292 293 # Else, configure the logger to write to a file 294 295 # Remove any existing handlers 296 for handler in logger.handlers: 297 logger.removeHandler(handler) 298 299 folder = AIRBYTE_LOGGING_ROOT / connector_name 300 folder.mkdir(parents=True, exist_ok=True) 301 302 # Create a file handler 303 global_logger = get_global_file_logger() 304 logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log" 305 logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}" 306 print(logfile_msg, file=sys.stderr) 307 if global_logger: 308 global_logger.info(logfile_msg) 309 310 file_handler = logging.FileHandler(logfile_path) 311 file_handler.setLevel(logging.INFO) 312 313 if AIRBYTE_STRUCTURED_LOGGING: 314 # Create a formatter and set it for the handler 315 formatter = logging.Formatter("%(message)s") 316 file_handler.setFormatter(formatter) 317 318 # Add the file handler to the logger 319 logger.addHandler(file_handler) 320 321 # Configure structlog 322 structlog.configure( 323 processors=[ 324 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 325 structlog.stdlib.add_log_level, 326 structlog.stdlib.PositionalArgumentsFormatter(), 327 structlog.processors.StackInfoRenderer(), 328 structlog.processors.format_exc_info, 329 structlog.processors.JSONRenderer(), 330 ], 331 context_class=dict, 332 logger_factory=structlog.stdlib.LoggerFactory(), 333 wrapper_class=structlog.stdlib.BoundLogger, 334 cache_logger_on_first_use=True, 335 ) 336 337 # Create a logger 338 return structlog.get_logger(f"airbyte.{connector_name}") 339 340 # Else, write logs in plain text 341 342 file_handler.setFormatter( 343 logging.Formatter( 344 fmt="%(asctime)s - %(levelname)s - %(message)s", 345 datefmt="%Y-%m-%d %H:%M:%S", 346 ) 347 ) 348 349 logger.addHandler(file_handler) 350 return logger
Whether to enable structured logging.
This value is read from the AIRBYTE_STRUCTURED_LOGGING environment variable. If the variable is
not set, the default value is False.
50def warn_once( 51 message: str, 52 logger: logging.Logger | None = None, 53 *, 54 with_stack: int | bool, 55) -> None: 56 """Emit a warning message only once. 57 58 This function is a wrapper around the `warnings.warn` function that logs the warning message 59 to the global logger. The warning message is only emitted once per unique message. 60 """ 61 if message in _warned_messages: 62 return 63 64 if not with_stack: 65 stacklevel = 0 66 elif with_stack is True: 67 stacklevel = 2 68 elif isinstance(with_stack, int): 69 stacklevel = with_stack 70 else: 71 stacklevel = 0 72 73 _warned_messages.add(message) 74 warnings.warn( 75 message, 76 category=UserWarning, 77 stacklevel=stacklevel, 78 ) 79 80 if logger: 81 logger.warning(message)
Emit a warning message only once.
This function is a wrapper around the warnings.warn function that logs the warning message
to the global logger. The warning message is only emitted once per unique message.
The root directory for Airbyte logs.
This value can be overridden by setting the AIRBYTE_LOGGING_ROOT environment variable.
If not provided, PyAirbyte will use /tmp/airbyte/logs/ where /tmp/ is the OS's default
temporary directory. If the directory cannot be created, PyAirbyte will log a warning and
set this value to None.
129@lru_cache 130def get_global_file_logger() -> logging.Logger | None: 131 """Return the global logger for PyAirbyte. 132 133 This logger is configured to write logs to the console and to a file in the log directory. 134 """ 135 logger = logging.getLogger("airbyte") 136 logger.setLevel(logging.INFO) 137 logger.propagate = False 138 139 if AIRBYTE_LOGGING_ROOT is None: 140 # No temp directory available, so return None 141 return None 142 143 # Else, configure the logger to write to a file 144 145 # Remove any existing handlers 146 for handler in logger.handlers: 147 logger.removeHandler(handler) 148 149 yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d") 150 folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd 151 try: 152 folder.mkdir(parents=True, exist_ok=True) 153 except Exception: 154 warn_once( 155 f"Failed to create logging directory at '{folder!s}'.", 156 with_stack=False, 157 ) 158 return None 159 160 logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log" 161 print(f"Writing PyAirbyte logs to file: {logfile_path!s}", file=sys.stderr) 162 163 file_handler = logging.FileHandler( 164 filename=logfile_path, 165 encoding="utf-8", 166 ) 167 168 if AIRBYTE_STRUCTURED_LOGGING: 169 # Create a formatter and set it for the handler 170 formatter = logging.Formatter("%(message)s") 171 file_handler.setFormatter(formatter) 172 173 # Add the file handler to the logger 174 logger.addHandler(file_handler) 175 176 # Configure structlog 177 structlog.configure( 178 processors=[ 179 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 180 structlog.stdlib.add_log_level, 181 structlog.stdlib.PositionalArgumentsFormatter(), 182 structlog.processors.StackInfoRenderer(), 183 structlog.processors.format_exc_info, 184 structlog.processors.JSONRenderer(), 185 ], 186 context_class=dict, 187 logger_factory=structlog.stdlib.LoggerFactory(), 188 wrapper_class=structlog.stdlib.BoundLogger, 189 cache_logger_on_first_use=True, 190 ) 191 192 # Create a logger 193 return structlog.get_logger("airbyte") 194 195 # Create and configure file handler 196 file_handler.setFormatter( 197 logging.Formatter( 198 fmt="%(asctime)s - %(levelname)s - %(message)s", 199 datefmt="%Y-%m-%d %H:%M:%S", 200 ) 201 ) 202 203 logger.addHandler(file_handler) 204 return logger
Return the global logger for PyAirbyte.
This logger is configured to write logs to the console and to a file in the log directory.
207def get_global_stats_log_path() -> Path | None: 208 """Return the path to the performance log file.""" 209 if AIRBYTE_LOGGING_ROOT is None: 210 return None 211 212 folder = AIRBYTE_LOGGING_ROOT 213 try: 214 folder.mkdir(parents=True, exist_ok=True) 215 except Exception: 216 warn_once( 217 f"Failed to create logging directory at '{folder!s}'.", 218 with_stack=False, 219 ) 220 return None 221 222 return folder / "airbyte-stats.log"
Return the path to the performance log file.
225@lru_cache 226def get_global_stats_logger() -> structlog.BoundLogger: 227 """Create a stats logger for performance metrics.""" 228 logger = logging.getLogger("airbyte.stats") 229 logger.setLevel(logging.INFO) 230 logger.propagate = False 231 232 # Configure structlog 233 structlog.configure( 234 processors=[ 235 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 236 structlog.stdlib.PositionalArgumentsFormatter(), 237 structlog.processors.JSONRenderer(), 238 ], 239 context_class=dict, 240 logger_factory=structlog.stdlib.LoggerFactory(), 241 wrapper_class=structlog.stdlib.BoundLogger, 242 cache_logger_on_first_use=True, 243 ) 244 245 logfile_path: Path | None = get_global_stats_log_path() 246 if AIRBYTE_LOGGING_ROOT is None or logfile_path is None: 247 # No temp directory available, so return no-op logger without handlers 248 return structlog.get_logger("airbyte.stats") 249 250 print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}", file=sys.stderr) 251 252 # Remove any existing handlers 253 for handler in logger.handlers: 254 logger.removeHandler(handler) 255 256 folder = AIRBYTE_LOGGING_ROOT 257 try: 258 folder.mkdir(parents=True, exist_ok=True) 259 except Exception: 260 warn_once( 261 f"Failed to create logging directory at '{folder!s}'.", 262 with_stack=False, 263 ) 264 return structlog.get_logger("airbyte.stats") 265 266 file_handler = logging.FileHandler( 267 filename=logfile_path, 268 encoding="utf-8", 269 ) 270 271 # Create a formatter and set it for the handler 272 formatter = logging.Formatter("%(message)s") 273 file_handler.setFormatter(formatter) 274 275 # Add the file handler to the logger 276 logger.addHandler(file_handler) 277 278 # Create a logger 279 return structlog.get_logger("airbyte.stats")
Create a stats logger for performance metrics.
282def new_passthrough_file_logger(connector_name: str) -> logging.Logger: 283 """Create a logger from logging module.""" 284 logger = logging.getLogger(f"airbyte.{connector_name}") 285 logger.setLevel(logging.INFO) 286 287 # Prevent logging to stderr by stopping propagation to the root logger 288 logger.propagate = False 289 290 if AIRBYTE_LOGGING_ROOT is None: 291 # No temp directory available, so return a basic logger 292 return logger 293 294 # Else, configure the logger to write to a file 295 296 # Remove any existing handlers 297 for handler in logger.handlers: 298 logger.removeHandler(handler) 299 300 folder = AIRBYTE_LOGGING_ROOT / connector_name 301 folder.mkdir(parents=True, exist_ok=True) 302 303 # Create a file handler 304 global_logger = get_global_file_logger() 305 logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log" 306 logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}" 307 print(logfile_msg, file=sys.stderr) 308 if global_logger: 309 global_logger.info(logfile_msg) 310 311 file_handler = logging.FileHandler(logfile_path) 312 file_handler.setLevel(logging.INFO) 313 314 if AIRBYTE_STRUCTURED_LOGGING: 315 # Create a formatter and set it for the handler 316 formatter = logging.Formatter("%(message)s") 317 file_handler.setFormatter(formatter) 318 319 # Add the file handler to the logger 320 logger.addHandler(file_handler) 321 322 # Configure structlog 323 structlog.configure( 324 processors=[ 325 structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"), 326 structlog.stdlib.add_log_level, 327 structlog.stdlib.PositionalArgumentsFormatter(), 328 structlog.processors.StackInfoRenderer(), 329 structlog.processors.format_exc_info, 330 structlog.processors.JSONRenderer(), 331 ], 332 context_class=dict, 333 logger_factory=structlog.stdlib.LoggerFactory(), 334 wrapper_class=structlog.stdlib.BoundLogger, 335 cache_logger_on_first_use=True, 336 ) 337 338 # Create a logger 339 return structlog.get_logger(f"airbyte.{connector_name}") 340 341 # Else, write logs in plain text 342 343 file_handler.setFormatter( 344 logging.Formatter( 345 fmt="%(asctime)s - %(levelname)s - %(message)s", 346 datefmt="%Y-%m-%d %H:%M:%S", 347 ) 348 ) 349 350 logger.addHandler(file_handler) 351 return logger
Create a logger from logging module.