airbyte.logs

PyAirbyte Logging features and related configuration.

By default, PyAirbyte main logs are written to a file in the AIRBYTE_LOGGING_ROOT directory, which defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log files within the same directory, under a subfolder with the name of the connector.

PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured logging in JSON, set AIRBYTE_STRUCTURED_LOGGING to True.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte Logging features and related configuration.
  3
  4By default, PyAirbyte main logs are written to a file in the `AIRBYTE_LOGGING_ROOT` directory, which
  5defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log
  6files within the same directory, under a subfolder with the name of the connector.
  7
  8PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured
  9logging in JSON, set `AIRBYTE_STRUCTURED_LOGGING` to `True`.
 10"""
 11
 12from __future__ import annotations
 13
 14import logging
 15import os
 16import platform
 17import sys
 18import tempfile
 19import warnings
 20from functools import lru_cache
 21from pathlib import Path
 22
 23import structlog
 24import ulid
 25
 26from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
 27
 28
 29def _str_to_bool(value: str) -> bool:
 30    """Convert a string value of an environment values to a boolean value."""
 31    return bool(value) and value.lower() not in {"", "0", "false", "f", "no", "n", "off"}
 32
 33
 34AIRBYTE_STRUCTURED_LOGGING: bool = _str_to_bool(
 35    os.getenv(
 36        key="AIRBYTE_STRUCTURED_LOGGING",
 37        default="false",
 38    )
 39)
 40"""Whether to enable structured logging.
 41
 42This value is read from the `AIRBYTE_STRUCTURED_LOGGING` environment variable. If the variable is
 43not set, the default value is `False`.
 44"""
 45
 46_warned_messages: set[str] = set()
 47
 48
 49def warn_once(
 50    message: str,
 51    logger: logging.Logger | None = None,
 52    *,
 53    with_stack: int | bool,
 54) -> None:
 55    """Emit a warning message only once.
 56
 57    This function is a wrapper around the `warnings.warn` function that logs the warning message
 58    to the global logger. The warning message is only emitted once per unique message.
 59    """
 60    if message in _warned_messages:
 61        return
 62
 63    if not with_stack:
 64        stacklevel = 0
 65    elif with_stack is True:
 66        stacklevel = 2
 67    elif isinstance(with_stack, int):
 68        stacklevel = with_stack
 69    else:
 70        stacklevel = 0
 71
 72    _warned_messages.add(message)
 73    warnings.warn(
 74        message,
 75        category=UserWarning,
 76        stacklevel=stacklevel,
 77    )
 78
 79    if logger:
 80        logger.warning(message)
 81
 82
 83def _get_logging_root() -> Path | None:
 84    """Return the root directory for logs.
 85
 86    Returns `None` if no valid path can be found.
 87
 88    This is the directory where logs are stored.
 89    """
 90    if "AIRBYTE_LOGGING_ROOT" in os.environ:
 91        log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"])
 92    elif platform.system() == "Darwin" or platform.system() == "Linux":
 93        # Use /tmp on macOS and Linux
 94        log_root = Path("/tmp") / "airbyte" / "logs"
 95    else:
 96        # Use the default temp directory on Windows or any other OS
 97        log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs"
 98
 99    try:
100        # Attempt to create the log root directory if it does not exist
101        log_root.mkdir(parents=True, exist_ok=True)
102    except OSError:
103        # Handle the error by returning None
104        warn_once(
105            (
106                f"Failed to create PyAirbyte logging directory at `{log_root}`. "
107                "You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` "
108                "environment variable."
109            ),
110            with_stack=False,
111        )
112        return None
113    else:
114        return log_root
115
116
117AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root()
118"""The root directory for Airbyte logs.
119
120This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable.
121
122If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default
123temporary directory. If the directory cannot be created, PyAirbyte will log a warning and
124set this value to `None`.
125"""
126
127
128@lru_cache
129def get_global_file_logger() -> logging.Logger | None:
130    """Return the global logger for PyAirbyte.
131
132    This logger is configured to write logs to the console and to a file in the log directory.
133    """
134    logger = logging.getLogger("airbyte")
135    logger.setLevel(logging.INFO)
136    logger.propagate = False
137
138    if AIRBYTE_LOGGING_ROOT is None:
139        # No temp directory available, so return None
140        return None
141
142    # Else, configure the logger to write to a file
143
144    # Remove any existing handlers
145    for handler in logger.handlers:
146        logger.removeHandler(handler)
147
148    yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d")
149    folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd
150    try:
151        folder.mkdir(parents=True, exist_ok=True)
152    except Exception:
153        warn_once(
154            f"Failed to create logging directory at '{folder!s}'.",
155            with_stack=False,
156        )
157        return None
158
159    logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log"
160    print(f"Writing PyAirbyte logs to file: {logfile_path!s}", file=sys.stderr)
161
162    file_handler = logging.FileHandler(
163        filename=logfile_path,
164        encoding="utf-8",
165    )
166
167    if AIRBYTE_STRUCTURED_LOGGING:
168        # Create a formatter and set it for the handler
169        formatter = logging.Formatter("%(message)s")
170        file_handler.setFormatter(formatter)
171
172        # Add the file handler to the logger
173        logger.addHandler(file_handler)
174
175        # Configure structlog
176        structlog.configure(
177            processors=[
178                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
179                structlog.stdlib.add_log_level,
180                structlog.stdlib.PositionalArgumentsFormatter(),
181                structlog.processors.StackInfoRenderer(),
182                structlog.processors.format_exc_info,
183                structlog.processors.JSONRenderer(),
184            ],
185            context_class=dict,
186            logger_factory=structlog.stdlib.LoggerFactory(),
187            wrapper_class=structlog.stdlib.BoundLogger,
188            cache_logger_on_first_use=True,
189        )
190
191        # Create a logger
192        return structlog.get_logger("airbyte")
193
194    # Create and configure file handler
195    file_handler.setFormatter(
196        logging.Formatter(
197            fmt="%(asctime)s - %(levelname)s - %(message)s",
198            datefmt="%Y-%m-%d %H:%M:%S",
199        )
200    )
201
202    logger.addHandler(file_handler)
203    return logger
204
205
206def get_global_stats_log_path() -> Path | None:
207    """Return the path to the performance log file."""
208    if AIRBYTE_LOGGING_ROOT is None:
209        return None
210
211    folder = AIRBYTE_LOGGING_ROOT
212    try:
213        folder.mkdir(parents=True, exist_ok=True)
214    except Exception:
215        warn_once(
216            f"Failed to create logging directory at '{folder!s}'.",
217            with_stack=False,
218        )
219        return None
220
221    return folder / "airbyte-stats.log"
222
223
224@lru_cache
225def get_global_stats_logger() -> structlog.BoundLogger:
226    """Create a stats logger for performance metrics."""
227    logger = logging.getLogger("airbyte.stats")
228    logger.setLevel(logging.INFO)
229    logger.propagate = False
230
231    # Configure structlog
232    structlog.configure(
233        processors=[
234            structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
235            structlog.stdlib.PositionalArgumentsFormatter(),
236            structlog.processors.JSONRenderer(),
237        ],
238        context_class=dict,
239        logger_factory=structlog.stdlib.LoggerFactory(),
240        wrapper_class=structlog.stdlib.BoundLogger,
241        cache_logger_on_first_use=True,
242    )
243
244    logfile_path: Path | None = get_global_stats_log_path()
245    if AIRBYTE_LOGGING_ROOT is None or logfile_path is None:
246        # No temp directory available, so return no-op logger without handlers
247        return structlog.get_logger("airbyte.stats")
248
249    print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}", file=sys.stderr)
250
251    # Remove any existing handlers
252    for handler in logger.handlers:
253        logger.removeHandler(handler)
254
255    folder = AIRBYTE_LOGGING_ROOT
256    try:
257        folder.mkdir(parents=True, exist_ok=True)
258    except Exception:
259        warn_once(
260            f"Failed to create logging directory at '{folder!s}'.",
261            with_stack=False,
262        )
263        return structlog.get_logger("airbyte.stats")
264
265    file_handler = logging.FileHandler(
266        filename=logfile_path,
267        encoding="utf-8",
268    )
269
270    # Create a formatter and set it for the handler
271    formatter = logging.Formatter("%(message)s")
272    file_handler.setFormatter(formatter)
273
274    # Add the file handler to the logger
275    logger.addHandler(file_handler)
276
277    # Create a logger
278    return structlog.get_logger("airbyte.stats")
279
280
281def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
282    """Create a logger from logging module."""
283    logger = logging.getLogger(f"airbyte.{connector_name}")
284    logger.setLevel(logging.INFO)
285
286    # Prevent logging to stderr by stopping propagation to the root logger
287    logger.propagate = False
288
289    if AIRBYTE_LOGGING_ROOT is None:
290        # No temp directory available, so return a basic logger
291        return logger
292
293    # Else, configure the logger to write to a file
294
295    # Remove any existing handlers
296    for handler in logger.handlers:
297        logger.removeHandler(handler)
298
299    folder = AIRBYTE_LOGGING_ROOT / connector_name
300    folder.mkdir(parents=True, exist_ok=True)
301
302    # Create a file handler
303    global_logger = get_global_file_logger()
304    logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log"
305    logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}"
306    print(logfile_msg, file=sys.stderr)
307    if global_logger:
308        global_logger.info(logfile_msg)
309
310    file_handler = logging.FileHandler(logfile_path)
311    file_handler.setLevel(logging.INFO)
312
313    if AIRBYTE_STRUCTURED_LOGGING:
314        # Create a formatter and set it for the handler
315        formatter = logging.Formatter("%(message)s")
316        file_handler.setFormatter(formatter)
317
318        # Add the file handler to the logger
319        logger.addHandler(file_handler)
320
321        # Configure structlog
322        structlog.configure(
323            processors=[
324                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
325                structlog.stdlib.add_log_level,
326                structlog.stdlib.PositionalArgumentsFormatter(),
327                structlog.processors.StackInfoRenderer(),
328                structlog.processors.format_exc_info,
329                structlog.processors.JSONRenderer(),
330            ],
331            context_class=dict,
332            logger_factory=structlog.stdlib.LoggerFactory(),
333            wrapper_class=structlog.stdlib.BoundLogger,
334            cache_logger_on_first_use=True,
335        )
336
337        # Create a logger
338        return structlog.get_logger(f"airbyte.{connector_name}")
339
340    # Else, write logs in plain text
341
342    file_handler.setFormatter(
343        logging.Formatter(
344            fmt="%(asctime)s - %(levelname)s - %(message)s",
345            datefmt="%Y-%m-%d %H:%M:%S",
346        )
347    )
348
349    logger.addHandler(file_handler)
350    return logger
AIRBYTE_STRUCTURED_LOGGING: bool = False

Whether to enable structured logging.

This value is read from the AIRBYTE_STRUCTURED_LOGGING environment variable. If the variable is not set, the default value is False.

def warn_once( message: str, logger: logging.Logger | None = None, *, with_stack: int | bool) -> None:
50def warn_once(
51    message: str,
52    logger: logging.Logger | None = None,
53    *,
54    with_stack: int | bool,
55) -> None:
56    """Emit a warning message only once.
57
58    This function is a wrapper around the `warnings.warn` function that logs the warning message
59    to the global logger. The warning message is only emitted once per unique message.
60    """
61    if message in _warned_messages:
62        return
63
64    if not with_stack:
65        stacklevel = 0
66    elif with_stack is True:
67        stacklevel = 2
68    elif isinstance(with_stack, int):
69        stacklevel = with_stack
70    else:
71        stacklevel = 0
72
73    _warned_messages.add(message)
74    warnings.warn(
75        message,
76        category=UserWarning,
77        stacklevel=stacklevel,
78    )
79
80    if logger:
81        logger.warning(message)

Emit a warning message only once.

This function is a wrapper around the warnings.warn function that logs the warning message to the global logger. The warning message is only emitted once per unique message.

AIRBYTE_LOGGING_ROOT: pathlib.Path | None = PosixPath('/tmp/airbyte/logs')

The root directory for Airbyte logs.

This value can be overridden by setting the AIRBYTE_LOGGING_ROOT environment variable.

If not provided, PyAirbyte will use /tmp/airbyte/logs/ where /tmp/ is the OS's default temporary directory. If the directory cannot be created, PyAirbyte will log a warning and set this value to None.

@lru_cache
def get_global_file_logger() -> logging.Logger | None:
129@lru_cache
130def get_global_file_logger() -> logging.Logger | None:
131    """Return the global logger for PyAirbyte.
132
133    This logger is configured to write logs to the console and to a file in the log directory.
134    """
135    logger = logging.getLogger("airbyte")
136    logger.setLevel(logging.INFO)
137    logger.propagate = False
138
139    if AIRBYTE_LOGGING_ROOT is None:
140        # No temp directory available, so return None
141        return None
142
143    # Else, configure the logger to write to a file
144
145    # Remove any existing handlers
146    for handler in logger.handlers:
147        logger.removeHandler(handler)
148
149    yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d")
150    folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd
151    try:
152        folder.mkdir(parents=True, exist_ok=True)
153    except Exception:
154        warn_once(
155            f"Failed to create logging directory at '{folder!s}'.",
156            with_stack=False,
157        )
158        return None
159
160    logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log"
161    print(f"Writing PyAirbyte logs to file: {logfile_path!s}", file=sys.stderr)
162
163    file_handler = logging.FileHandler(
164        filename=logfile_path,
165        encoding="utf-8",
166    )
167
168    if AIRBYTE_STRUCTURED_LOGGING:
169        # Create a formatter and set it for the handler
170        formatter = logging.Formatter("%(message)s")
171        file_handler.setFormatter(formatter)
172
173        # Add the file handler to the logger
174        logger.addHandler(file_handler)
175
176        # Configure structlog
177        structlog.configure(
178            processors=[
179                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
180                structlog.stdlib.add_log_level,
181                structlog.stdlib.PositionalArgumentsFormatter(),
182                structlog.processors.StackInfoRenderer(),
183                structlog.processors.format_exc_info,
184                structlog.processors.JSONRenderer(),
185            ],
186            context_class=dict,
187            logger_factory=structlog.stdlib.LoggerFactory(),
188            wrapper_class=structlog.stdlib.BoundLogger,
189            cache_logger_on_first_use=True,
190        )
191
192        # Create a logger
193        return structlog.get_logger("airbyte")
194
195    # Create and configure file handler
196    file_handler.setFormatter(
197        logging.Formatter(
198            fmt="%(asctime)s - %(levelname)s - %(message)s",
199            datefmt="%Y-%m-%d %H:%M:%S",
200        )
201    )
202
203    logger.addHandler(file_handler)
204    return logger

Return the global logger for PyAirbyte.

This logger is configured to write logs to the console and to a file in the log directory.

def get_global_stats_log_path() -> pathlib.Path | None:
207def get_global_stats_log_path() -> Path | None:
208    """Return the path to the performance log file."""
209    if AIRBYTE_LOGGING_ROOT is None:
210        return None
211
212    folder = AIRBYTE_LOGGING_ROOT
213    try:
214        folder.mkdir(parents=True, exist_ok=True)
215    except Exception:
216        warn_once(
217            f"Failed to create logging directory at '{folder!s}'.",
218            with_stack=False,
219        )
220        return None
221
222    return folder / "airbyte-stats.log"

Return the path to the performance log file.

@lru_cache
def get_global_stats_logger() -> structlog._generic.BoundLogger:
225@lru_cache
226def get_global_stats_logger() -> structlog.BoundLogger:
227    """Create a stats logger for performance metrics."""
228    logger = logging.getLogger("airbyte.stats")
229    logger.setLevel(logging.INFO)
230    logger.propagate = False
231
232    # Configure structlog
233    structlog.configure(
234        processors=[
235            structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
236            structlog.stdlib.PositionalArgumentsFormatter(),
237            structlog.processors.JSONRenderer(),
238        ],
239        context_class=dict,
240        logger_factory=structlog.stdlib.LoggerFactory(),
241        wrapper_class=structlog.stdlib.BoundLogger,
242        cache_logger_on_first_use=True,
243    )
244
245    logfile_path: Path | None = get_global_stats_log_path()
246    if AIRBYTE_LOGGING_ROOT is None or logfile_path is None:
247        # No temp directory available, so return no-op logger without handlers
248        return structlog.get_logger("airbyte.stats")
249
250    print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}", file=sys.stderr)
251
252    # Remove any existing handlers
253    for handler in logger.handlers:
254        logger.removeHandler(handler)
255
256    folder = AIRBYTE_LOGGING_ROOT
257    try:
258        folder.mkdir(parents=True, exist_ok=True)
259    except Exception:
260        warn_once(
261            f"Failed to create logging directory at '{folder!s}'.",
262            with_stack=False,
263        )
264        return structlog.get_logger("airbyte.stats")
265
266    file_handler = logging.FileHandler(
267        filename=logfile_path,
268        encoding="utf-8",
269    )
270
271    # Create a formatter and set it for the handler
272    formatter = logging.Formatter("%(message)s")
273    file_handler.setFormatter(formatter)
274
275    # Add the file handler to the logger
276    logger.addHandler(file_handler)
277
278    # Create a logger
279    return structlog.get_logger("airbyte.stats")

Create a stats logger for performance metrics.

def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
282def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
283    """Create a logger from logging module."""
284    logger = logging.getLogger(f"airbyte.{connector_name}")
285    logger.setLevel(logging.INFO)
286
287    # Prevent logging to stderr by stopping propagation to the root logger
288    logger.propagate = False
289
290    if AIRBYTE_LOGGING_ROOT is None:
291        # No temp directory available, so return a basic logger
292        return logger
293
294    # Else, configure the logger to write to a file
295
296    # Remove any existing handlers
297    for handler in logger.handlers:
298        logger.removeHandler(handler)
299
300    folder = AIRBYTE_LOGGING_ROOT / connector_name
301    folder.mkdir(parents=True, exist_ok=True)
302
303    # Create a file handler
304    global_logger = get_global_file_logger()
305    logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log"
306    logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}"
307    print(logfile_msg, file=sys.stderr)
308    if global_logger:
309        global_logger.info(logfile_msg)
310
311    file_handler = logging.FileHandler(logfile_path)
312    file_handler.setLevel(logging.INFO)
313
314    if AIRBYTE_STRUCTURED_LOGGING:
315        # Create a formatter and set it for the handler
316        formatter = logging.Formatter("%(message)s")
317        file_handler.setFormatter(formatter)
318
319        # Add the file handler to the logger
320        logger.addHandler(file_handler)
321
322        # Configure structlog
323        structlog.configure(
324            processors=[
325                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
326                structlog.stdlib.add_log_level,
327                structlog.stdlib.PositionalArgumentsFormatter(),
328                structlog.processors.StackInfoRenderer(),
329                structlog.processors.format_exc_info,
330                structlog.processors.JSONRenderer(),
331            ],
332            context_class=dict,
333            logger_factory=structlog.stdlib.LoggerFactory(),
334            wrapper_class=structlog.stdlib.BoundLogger,
335            cache_logger_on_first_use=True,
336        )
337
338        # Create a logger
339        return structlog.get_logger(f"airbyte.{connector_name}")
340
341    # Else, write logs in plain text
342
343    file_handler.setFormatter(
344        logging.Formatter(
345            fmt="%(asctime)s - %(levelname)s - %(message)s",
346            datefmt="%Y-%m-%d %H:%M:%S",
347        )
348    )
349
350    logger.addHandler(file_handler)
351    return logger

Create a logger from logging module.