airbyte.logs

PyAirbyte Logging features and related configuration.

By default, PyAirbyte main logs are written to a file in the AIRBYTE_LOGGING_ROOT directory, which defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log files within the same directory, under a subfolder with the name of the connector.

PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured logging in JSON, set AIRBYTE_STRUCTURED_LOGGING to True.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte Logging features and related configuration.
  3
  4By default, PyAirbyte main logs are written to a file in the `AIRBYTE_LOGGING_ROOT` directory, which
  5defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log
  6files within the same directory, under a subfolder with the name of the connector.
  7
  8PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured
  9logging in JSON, set `AIRBYTE_STRUCTURED_LOGGING` to `True`.
 10"""
 11
 12from __future__ import annotations
 13
 14import logging
 15import os
 16import platform
 17import sys
 18import tempfile
 19import warnings
 20from functools import lru_cache
 21from pathlib import Path
 22
 23import structlog
 24import ulid
 25
 26from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
 27
 28
 29def _str_to_bool(value: str) -> bool:
 30    """Convert a string value of an environment values to a boolean value."""
 31    return bool(value) and value.lower() not in {"", "0", "false", "f", "no", "n", "off"}
 32
 33
 34AIRBYTE_STRUCTURED_LOGGING: bool = _str_to_bool(
 35    os.getenv(
 36        key="AIRBYTE_STRUCTURED_LOGGING",
 37        default="false",
 38    )
 39)
 40"""Whether to enable structured logging.
 41
 42This value is read from the `AIRBYTE_STRUCTURED_LOGGING` environment variable. If the variable is
 43not set, the default value is `False`.
 44"""
 45
 46_warned_messages: set[str] = set()
 47
 48
 49def warn_once(
 50    message: str,
 51    logger: logging.Logger | None = None,
 52    *,
 53    with_stack: int | bool,
 54) -> None:
 55    """Emit a warning message only once.
 56
 57    This function is a wrapper around the `warnings.warn` function that logs the warning message
 58    to the global logger. The warning message is only emitted once per unique message.
 59    """
 60    if message in _warned_messages:
 61        return
 62
 63    if not with_stack:
 64        stacklevel = 0
 65
 66    if with_stack is True:
 67        stacklevel = 2
 68
 69    _warned_messages.add(message)
 70    warnings.warn(
 71        message,
 72        category=UserWarning,
 73        stacklevel=stacklevel,
 74    )
 75
 76    if logger:
 77        logger.warning(message)
 78
 79
 80def _get_logging_root() -> Path | None:
 81    """Return the root directory for logs.
 82
 83    Returns `None` if no valid path can be found.
 84
 85    This is the directory where logs are stored.
 86    """
 87    if "AIRBYTE_LOGGING_ROOT" in os.environ:
 88        log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"])
 89    elif platform.system() == "Darwin" or platform.system() == "Linux":
 90        # Use /tmp on macOS and Linux
 91        log_root = Path("/tmp") / "airbyte" / "logs"
 92    else:
 93        # Use the default temp directory on Windows or any other OS
 94        log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs"
 95
 96    try:
 97        # Attempt to create the log root directory if it does not exist
 98        log_root.mkdir(parents=True, exist_ok=True)
 99    except OSError:
100        # Handle the error by returning None
101        warn_once(
102            (
103                f"Failed to create PyAirbyte logging directory at `{log_root}`. "
104                "You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` "
105                "environment variable."
106            ),
107            with_stack=False,
108        )
109        return None
110    else:
111        return log_root
112
113
114AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root()
115"""The root directory for Airbyte logs.
116
117This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable.
118
119If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default
120temporary directory. If the directory cannot be created, PyAirbyte will log a warning and
121set this value to `None`.
122"""
123
124
125@lru_cache
126def get_global_file_logger() -> logging.Logger | None:
127    """Return the global logger for PyAirbyte.
128
129    This logger is configured to write logs to the console and to a file in the log directory.
130    """
131    logger = logging.getLogger("airbyte")
132    logger.setLevel(logging.INFO)
133    logger.propagate = False
134
135    if AIRBYTE_LOGGING_ROOT is None:
136        # No temp directory available, so return None
137        return None
138
139    # Else, configure the logger to write to a file
140
141    # Remove any existing handlers
142    for handler in logger.handlers:
143        logger.removeHandler(handler)
144
145    yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d")
146    folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd
147    try:
148        folder.mkdir(parents=True, exist_ok=True)
149    except Exception:
150        warn_once(
151            f"Failed to create logging directory at '{folder!s}'.",
152            with_stack=False,
153        )
154        return None
155
156    logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log"
157    print(f"Writing PyAirbyte logs to file: {logfile_path!s}", file=sys.stderr)
158
159    file_handler = logging.FileHandler(
160        filename=logfile_path,
161        encoding="utf-8",
162    )
163
164    if AIRBYTE_STRUCTURED_LOGGING:
165        # Create a formatter and set it for the handler
166        formatter = logging.Formatter("%(message)s")
167        file_handler.setFormatter(formatter)
168
169        # Add the file handler to the logger
170        logger.addHandler(file_handler)
171
172        # Configure structlog
173        structlog.configure(
174            processors=[
175                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
176                structlog.stdlib.add_log_level,
177                structlog.stdlib.PositionalArgumentsFormatter(),
178                structlog.processors.StackInfoRenderer(),
179                structlog.processors.format_exc_info,
180                structlog.processors.JSONRenderer(),
181            ],
182            context_class=dict,
183            logger_factory=structlog.stdlib.LoggerFactory(),
184            wrapper_class=structlog.stdlib.BoundLogger,
185            cache_logger_on_first_use=True,
186        )
187
188        # Create a logger
189        return structlog.get_logger("airbyte")
190
191    # Create and configure file handler
192    file_handler.setFormatter(
193        logging.Formatter(
194            fmt="%(asctime)s - %(levelname)s - %(message)s",
195            datefmt="%Y-%m-%d %H:%M:%S",
196        )
197    )
198
199    logger.addHandler(file_handler)
200    return logger
201
202
203def get_global_stats_log_path() -> Path | None:
204    """Return the path to the performance log file."""
205    if AIRBYTE_LOGGING_ROOT is None:
206        return None
207
208    folder = AIRBYTE_LOGGING_ROOT
209    try:
210        folder.mkdir(parents=True, exist_ok=True)
211    except Exception:
212        warn_once(
213            f"Failed to create logging directory at '{folder!s}'.",
214            with_stack=False,
215        )
216        return None
217
218    return folder / "airbyte-stats.log"
219
220
221@lru_cache
222def get_global_stats_logger() -> structlog.BoundLogger:
223    """Create a stats logger for performance metrics."""
224    logger = logging.getLogger("airbyte.stats")
225    logger.setLevel(logging.INFO)
226    logger.propagate = False
227
228    # Configure structlog
229    structlog.configure(
230        processors=[
231            structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
232            structlog.stdlib.PositionalArgumentsFormatter(),
233            structlog.processors.JSONRenderer(),
234        ],
235        context_class=dict,
236        logger_factory=structlog.stdlib.LoggerFactory(),
237        wrapper_class=structlog.stdlib.BoundLogger,
238        cache_logger_on_first_use=True,
239    )
240
241    logfile_path: Path | None = get_global_stats_log_path()
242    if AIRBYTE_LOGGING_ROOT is None or logfile_path is None:
243        # No temp directory available, so return no-op logger without handlers
244        return structlog.get_logger("airbyte.stats")
245
246    print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}", file=sys.stderr)
247
248    # Remove any existing handlers
249    for handler in logger.handlers:
250        logger.removeHandler(handler)
251
252    folder = AIRBYTE_LOGGING_ROOT
253    try:
254        folder.mkdir(parents=True, exist_ok=True)
255    except Exception:
256        warn_once(
257            f"Failed to create logging directory at '{folder!s}'.",
258            with_stack=False,
259        )
260        return structlog.get_logger("airbyte.stats")
261
262    file_handler = logging.FileHandler(
263        filename=logfile_path,
264        encoding="utf-8",
265    )
266
267    # Create a formatter and set it for the handler
268    formatter = logging.Formatter("%(message)s")
269    file_handler.setFormatter(formatter)
270
271    # Add the file handler to the logger
272    logger.addHandler(file_handler)
273
274    # Create a logger
275    return structlog.get_logger("airbyte.stats")
276
277
278def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
279    """Create a logger from logging module."""
280    logger = logging.getLogger(f"airbyte.{connector_name}")
281    logger.setLevel(logging.INFO)
282
283    # Prevent logging to stderr by stopping propagation to the root logger
284    logger.propagate = False
285
286    if AIRBYTE_LOGGING_ROOT is None:
287        # No temp directory available, so return a basic logger
288        return logger
289
290    # Else, configure the logger to write to a file
291
292    # Remove any existing handlers
293    for handler in logger.handlers:
294        logger.removeHandler(handler)
295
296    folder = AIRBYTE_LOGGING_ROOT / connector_name
297    folder.mkdir(parents=True, exist_ok=True)
298
299    # Create a file handler
300    global_logger = get_global_file_logger()
301    logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log"
302    logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}"
303    print(logfile_msg, file=sys.stderr)
304    if global_logger:
305        global_logger.info(logfile_msg)
306
307    file_handler = logging.FileHandler(logfile_path)
308    file_handler.setLevel(logging.INFO)
309
310    if AIRBYTE_STRUCTURED_LOGGING:
311        # Create a formatter and set it for the handler
312        formatter = logging.Formatter("%(message)s")
313        file_handler.setFormatter(formatter)
314
315        # Add the file handler to the logger
316        logger.addHandler(file_handler)
317
318        # Configure structlog
319        structlog.configure(
320            processors=[
321                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
322                structlog.stdlib.add_log_level,
323                structlog.stdlib.PositionalArgumentsFormatter(),
324                structlog.processors.StackInfoRenderer(),
325                structlog.processors.format_exc_info,
326                structlog.processors.JSONRenderer(),
327            ],
328            context_class=dict,
329            logger_factory=structlog.stdlib.LoggerFactory(),
330            wrapper_class=structlog.stdlib.BoundLogger,
331            cache_logger_on_first_use=True,
332        )
333
334        # Create a logger
335        return structlog.get_logger(f"airbyte.{connector_name}")
336
337    # Else, write logs in plain text
338
339    file_handler.setFormatter(
340        logging.Formatter(
341            fmt="%(asctime)s - %(levelname)s - %(message)s",
342            datefmt="%Y-%m-%d %H:%M:%S",
343        )
344    )
345
346    logger.addHandler(file_handler)
347    return logger
AIRBYTE_STRUCTURED_LOGGING: bool = False

Whether to enable structured logging.

This value is read from the AIRBYTE_STRUCTURED_LOGGING environment variable. If the variable is not set, the default value is False.

def warn_once( message: str, logger: logging.Logger | None = None, *, with_stack: int | bool) -> None:
50def warn_once(
51    message: str,
52    logger: logging.Logger | None = None,
53    *,
54    with_stack: int | bool,
55) -> None:
56    """Emit a warning message only once.
57
58    This function is a wrapper around the `warnings.warn` function that logs the warning message
59    to the global logger. The warning message is only emitted once per unique message.
60    """
61    if message in _warned_messages:
62        return
63
64    if not with_stack:
65        stacklevel = 0
66
67    if with_stack is True:
68        stacklevel = 2
69
70    _warned_messages.add(message)
71    warnings.warn(
72        message,
73        category=UserWarning,
74        stacklevel=stacklevel,
75    )
76
77    if logger:
78        logger.warning(message)

Emit a warning message only once.

This function is a wrapper around the warnings.warn function that logs the warning message to the global logger. The warning message is only emitted once per unique message.

AIRBYTE_LOGGING_ROOT: pathlib.Path | None = PosixPath('/tmp/airbyte/logs')

The root directory for Airbyte logs.

This value can be overridden by setting the AIRBYTE_LOGGING_ROOT environment variable.

If not provided, PyAirbyte will use /tmp/airbyte/logs/ where /tmp/ is the OS's default temporary directory. If the directory cannot be created, PyAirbyte will log a warning and set this value to None.

@lru_cache
def get_global_file_logger() -> logging.Logger | None:
126@lru_cache
127def get_global_file_logger() -> logging.Logger | None:
128    """Return the global logger for PyAirbyte.
129
130    This logger is configured to write logs to the console and to a file in the log directory.
131    """
132    logger = logging.getLogger("airbyte")
133    logger.setLevel(logging.INFO)
134    logger.propagate = False
135
136    if AIRBYTE_LOGGING_ROOT is None:
137        # No temp directory available, so return None
138        return None
139
140    # Else, configure the logger to write to a file
141
142    # Remove any existing handlers
143    for handler in logger.handlers:
144        logger.removeHandler(handler)
145
146    yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d")
147    folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd
148    try:
149        folder.mkdir(parents=True, exist_ok=True)
150    except Exception:
151        warn_once(
152            f"Failed to create logging directory at '{folder!s}'.",
153            with_stack=False,
154        )
155        return None
156
157    logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log"
158    print(f"Writing PyAirbyte logs to file: {logfile_path!s}", file=sys.stderr)
159
160    file_handler = logging.FileHandler(
161        filename=logfile_path,
162        encoding="utf-8",
163    )
164
165    if AIRBYTE_STRUCTURED_LOGGING:
166        # Create a formatter and set it for the handler
167        formatter = logging.Formatter("%(message)s")
168        file_handler.setFormatter(formatter)
169
170        # Add the file handler to the logger
171        logger.addHandler(file_handler)
172
173        # Configure structlog
174        structlog.configure(
175            processors=[
176                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
177                structlog.stdlib.add_log_level,
178                structlog.stdlib.PositionalArgumentsFormatter(),
179                structlog.processors.StackInfoRenderer(),
180                structlog.processors.format_exc_info,
181                structlog.processors.JSONRenderer(),
182            ],
183            context_class=dict,
184            logger_factory=structlog.stdlib.LoggerFactory(),
185            wrapper_class=structlog.stdlib.BoundLogger,
186            cache_logger_on_first_use=True,
187        )
188
189        # Create a logger
190        return structlog.get_logger("airbyte")
191
192    # Create and configure file handler
193    file_handler.setFormatter(
194        logging.Formatter(
195            fmt="%(asctime)s - %(levelname)s - %(message)s",
196            datefmt="%Y-%m-%d %H:%M:%S",
197        )
198    )
199
200    logger.addHandler(file_handler)
201    return logger

Return the global logger for PyAirbyte.

This logger is configured to write logs to the console and to a file in the log directory.

def get_global_stats_log_path() -> pathlib.Path | None:
204def get_global_stats_log_path() -> Path | None:
205    """Return the path to the performance log file."""
206    if AIRBYTE_LOGGING_ROOT is None:
207        return None
208
209    folder = AIRBYTE_LOGGING_ROOT
210    try:
211        folder.mkdir(parents=True, exist_ok=True)
212    except Exception:
213        warn_once(
214            f"Failed to create logging directory at '{folder!s}'.",
215            with_stack=False,
216        )
217        return None
218
219    return folder / "airbyte-stats.log"

Return the path to the performance log file.

@lru_cache
def get_global_stats_logger() -> structlog._generic.BoundLogger:
222@lru_cache
223def get_global_stats_logger() -> structlog.BoundLogger:
224    """Create a stats logger for performance metrics."""
225    logger = logging.getLogger("airbyte.stats")
226    logger.setLevel(logging.INFO)
227    logger.propagate = False
228
229    # Configure structlog
230    structlog.configure(
231        processors=[
232            structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
233            structlog.stdlib.PositionalArgumentsFormatter(),
234            structlog.processors.JSONRenderer(),
235        ],
236        context_class=dict,
237        logger_factory=structlog.stdlib.LoggerFactory(),
238        wrapper_class=structlog.stdlib.BoundLogger,
239        cache_logger_on_first_use=True,
240    )
241
242    logfile_path: Path | None = get_global_stats_log_path()
243    if AIRBYTE_LOGGING_ROOT is None or logfile_path is None:
244        # No temp directory available, so return no-op logger without handlers
245        return structlog.get_logger("airbyte.stats")
246
247    print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}", file=sys.stderr)
248
249    # Remove any existing handlers
250    for handler in logger.handlers:
251        logger.removeHandler(handler)
252
253    folder = AIRBYTE_LOGGING_ROOT
254    try:
255        folder.mkdir(parents=True, exist_ok=True)
256    except Exception:
257        warn_once(
258            f"Failed to create logging directory at '{folder!s}'.",
259            with_stack=False,
260        )
261        return structlog.get_logger("airbyte.stats")
262
263    file_handler = logging.FileHandler(
264        filename=logfile_path,
265        encoding="utf-8",
266    )
267
268    # Create a formatter and set it for the handler
269    formatter = logging.Formatter("%(message)s")
270    file_handler.setFormatter(formatter)
271
272    # Add the file handler to the logger
273    logger.addHandler(file_handler)
274
275    # Create a logger
276    return structlog.get_logger("airbyte.stats")

Create a stats logger for performance metrics.

def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
279def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
280    """Create a logger from logging module."""
281    logger = logging.getLogger(f"airbyte.{connector_name}")
282    logger.setLevel(logging.INFO)
283
284    # Prevent logging to stderr by stopping propagation to the root logger
285    logger.propagate = False
286
287    if AIRBYTE_LOGGING_ROOT is None:
288        # No temp directory available, so return a basic logger
289        return logger
290
291    # Else, configure the logger to write to a file
292
293    # Remove any existing handlers
294    for handler in logger.handlers:
295        logger.removeHandler(handler)
296
297    folder = AIRBYTE_LOGGING_ROOT / connector_name
298    folder.mkdir(parents=True, exist_ok=True)
299
300    # Create a file handler
301    global_logger = get_global_file_logger()
302    logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log"
303    logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}"
304    print(logfile_msg, file=sys.stderr)
305    if global_logger:
306        global_logger.info(logfile_msg)
307
308    file_handler = logging.FileHandler(logfile_path)
309    file_handler.setLevel(logging.INFO)
310
311    if AIRBYTE_STRUCTURED_LOGGING:
312        # Create a formatter and set it for the handler
313        formatter = logging.Formatter("%(message)s")
314        file_handler.setFormatter(formatter)
315
316        # Add the file handler to the logger
317        logger.addHandler(file_handler)
318
319        # Configure structlog
320        structlog.configure(
321            processors=[
322                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
323                structlog.stdlib.add_log_level,
324                structlog.stdlib.PositionalArgumentsFormatter(),
325                structlog.processors.StackInfoRenderer(),
326                structlog.processors.format_exc_info,
327                structlog.processors.JSONRenderer(),
328            ],
329            context_class=dict,
330            logger_factory=structlog.stdlib.LoggerFactory(),
331            wrapper_class=structlog.stdlib.BoundLogger,
332            cache_logger_on_first_use=True,
333        )
334
335        # Create a logger
336        return structlog.get_logger(f"airbyte.{connector_name}")
337
338    # Else, write logs in plain text
339
340    file_handler.setFormatter(
341        logging.Formatter(
342            fmt="%(asctime)s - %(levelname)s - %(message)s",
343            datefmt="%Y-%m-%d %H:%M:%S",
344        )
345    )
346
347    logger.addHandler(file_handler)
348    return logger

Create a logger from logging module.