airbyte.logs

PyAirbyte Logging features and related configuration.

By default, PyAirbyte main logs are written to a file in the AIRBYTE_LOGGING_ROOT directory, which defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log files within the same directory, under a subfolder with the name of the connector.

PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured logging in JSON, set AIRBYTE_STRUCTURED_LOGGING to True.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte Logging features and related configuration.
  3
  4By default, PyAirbyte main logs are written to a file in the `AIRBYTE_LOGGING_ROOT` directory, which
  5defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log
  6files within the same directory, under a subfolder with the name of the connector.
  7
  8PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured
  9logging in JSON, set `AIRBYTE_STRUCTURED_LOGGING` to `True`.
 10"""
 11
 12from __future__ import annotations
 13
 14import logging
 15import os
 16import platform
 17import tempfile
 18import warnings
 19from functools import lru_cache
 20from pathlib import Path
 21
 22import pendulum
 23import structlog
 24import ulid
 25
 26
 27def _str_to_bool(value: str) -> bool:
 28    """Convert a string value of an environment values to a boolean value."""
 29    return bool(value) and value.lower() not in {"", "0", "false", "f", "no", "n", "off"}
 30
 31
 32AIRBYTE_STRUCTURED_LOGGING: bool = _str_to_bool(
 33    os.getenv(
 34        key="AIRBYTE_STRUCTURED_LOGGING",
 35        default="false",
 36    )
 37)
 38"""Whether to enable structured logging.
 39
 40This value is read from the `AIRBYTE_STRUCTURED_LOGGING` environment variable. If the variable is
 41not set, the default value is `False`.
 42"""
 43
 44_warned_messages: set[str] = set()
 45
 46
 47def warn_once(
 48    message: str,
 49    logger: logging.Logger | None = None,
 50    *,
 51    with_stack: int | bool,
 52) -> None:
 53    """Emit a warning message only once.
 54
 55    This function is a wrapper around the `warnings.warn` function that logs the warning message
 56    to the global logger. The warning message is only emitted once per unique message.
 57    """
 58    if message in _warned_messages:
 59        return
 60
 61    if not with_stack:
 62        stacklevel = 0
 63    if with_stack is True:
 64        stacklevel = 2
 65
 66    _warned_messages.add(message)
 67    warnings.warn(
 68        message,
 69        category=UserWarning,
 70        stacklevel=stacklevel,
 71    )
 72
 73    if logger:
 74        logger.warning(message)
 75
 76
 77def _get_logging_root() -> Path | None:
 78    """Return the root directory for logs.
 79
 80    Returns `None` if no valid path can be found.
 81
 82    This is the directory where logs are stored.
 83    """
 84    if "AIRBYTE_LOGGING_ROOT" in os.environ:
 85        log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"])
 86    elif platform.system() == "Darwin" or platform.system() == "Linux":
 87        # Use /tmp on macOS and Linux
 88        log_root = Path("/tmp") / "airbyte" / "logs"
 89    else:
 90        # Use the default temp directory on Windows or any other OS
 91        log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs"
 92
 93    try:
 94        # Attempt to create the log root directory if it does not exist
 95        log_root.mkdir(parents=True, exist_ok=True)
 96    except OSError:
 97        # Handle the error by returning None
 98        warn_once(
 99            (
100                f"Failed to create PyAirbyte logging directory at `{log_root}`. "
101                "You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` "
102                "environment variable."
103            ),
104            with_stack=False,
105        )
106        return None
107    else:
108        return log_root
109
110
111AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root()
112"""The root directory for Airbyte logs.
113
114This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable.
115
116If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default
117temporary directory. If the directory cannot be created, PyAirbyte will log a warning and
118set this value to `None`.
119"""
120
121
122@lru_cache
123def get_global_file_logger() -> logging.Logger | None:
124    """Return the global logger for PyAirbyte.
125
126    This logger is configured to write logs to the console and to a file in the log directory.
127    """
128    logger = logging.getLogger("airbyte")
129    logger.setLevel(logging.INFO)
130    logger.propagate = False
131
132    if AIRBYTE_LOGGING_ROOT is None:
133        # No temp directory available, so return None
134        return None
135
136    # Else, configure the logger to write to a file
137
138    # Remove any existing handlers
139    for handler in logger.handlers:
140        logger.removeHandler(handler)
141
142    yyyy_mm_dd: str = pendulum.now().format("YYYY-MM-DD")
143    folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd
144    try:
145        folder.mkdir(parents=True, exist_ok=True)
146    except Exception:
147        warn_once(
148            f"Failed to create logging directory at '{folder!s}'.",
149            with_stack=False,
150        )
151        return None
152
153    logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log"
154    print(f"Writing PyAirbyte logs to file: {logfile_path!s}")
155
156    file_handler = logging.FileHandler(
157        filename=logfile_path,
158        encoding="utf-8",
159    )
160
161    if AIRBYTE_STRUCTURED_LOGGING:
162        # Create a formatter and set it for the handler
163        formatter = logging.Formatter("%(message)s")
164        file_handler.setFormatter(formatter)
165
166        # Add the file handler to the logger
167        logger.addHandler(file_handler)
168
169        # Configure structlog
170        structlog.configure(
171            processors=[
172                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
173                structlog.stdlib.add_log_level,
174                structlog.stdlib.PositionalArgumentsFormatter(),
175                structlog.processors.StackInfoRenderer(),
176                structlog.processors.format_exc_info,
177                structlog.processors.JSONRenderer(),
178            ],
179            context_class=dict,
180            logger_factory=structlog.stdlib.LoggerFactory(),
181            wrapper_class=structlog.stdlib.BoundLogger,
182            cache_logger_on_first_use=True,
183        )
184
185        # Create a logger
186        return structlog.get_logger("airbyte")
187
188    # Create and configure file handler
189    file_handler.setFormatter(
190        logging.Formatter(
191            fmt="%(asctime)s - %(levelname)s - %(message)s",
192            datefmt="%Y-%m-%d %H:%M:%S",
193        )
194    )
195
196    logger.addHandler(file_handler)
197    return logger
198
199
200def get_global_stats_log_path() -> Path | None:
201    """Return the path to the performance log file."""
202    if AIRBYTE_LOGGING_ROOT is None:
203        return None
204
205    folder = AIRBYTE_LOGGING_ROOT
206    try:
207        folder.mkdir(parents=True, exist_ok=True)
208    except Exception:
209        warn_once(
210            f"Failed to create logging directory at '{folder!s}'.",
211            with_stack=False,
212        )
213        return None
214
215    return folder / "airbyte-stats.log"
216
217
218@lru_cache
219def get_global_stats_logger() -> structlog.BoundLogger:
220    """Create a stats logger for performance metrics."""
221    logger = logging.getLogger("airbyte.stats")
222    logger.setLevel(logging.INFO)
223    logger.propagate = False
224
225    # Configure structlog
226    structlog.configure(
227        processors=[
228            structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
229            structlog.stdlib.PositionalArgumentsFormatter(),
230            structlog.processors.JSONRenderer(),
231        ],
232        context_class=dict,
233        logger_factory=structlog.stdlib.LoggerFactory(),
234        wrapper_class=structlog.stdlib.BoundLogger,
235        cache_logger_on_first_use=True,
236    )
237
238    logfile_path: Path | None = get_global_stats_log_path()
239    if AIRBYTE_LOGGING_ROOT is None or logfile_path is None:
240        # No temp directory available, so return no-op logger without handlers
241        return structlog.get_logger("airbyte.stats")
242
243    print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}")
244
245    # Remove any existing handlers
246    for handler in logger.handlers:
247        logger.removeHandler(handler)
248
249    folder = AIRBYTE_LOGGING_ROOT
250    try:
251        folder.mkdir(parents=True, exist_ok=True)
252    except Exception:
253        warn_once(
254            f"Failed to create logging directory at '{folder!s}'.",
255            with_stack=False,
256        )
257        return structlog.get_logger("airbyte.stats")
258
259    file_handler = logging.FileHandler(
260        filename=logfile_path,
261        encoding="utf-8",
262    )
263
264    # Create a formatter and set it for the handler
265    formatter = logging.Formatter("%(message)s")
266    file_handler.setFormatter(formatter)
267
268    # Add the file handler to the logger
269    logger.addHandler(file_handler)
270
271    # Create a logger
272    return structlog.get_logger("airbyte.stats")
273
274
275def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
276    """Create a logger from logging module."""
277    logger = logging.getLogger(f"airbyte.{connector_name}")
278    logger.setLevel(logging.INFO)
279
280    # Prevent logging to stderr by stopping propagation to the root logger
281    logger.propagate = False
282
283    if AIRBYTE_LOGGING_ROOT is None:
284        # No temp directory available, so return a basic logger
285        return logger
286
287    # Else, configure the logger to write to a file
288
289    # Remove any existing handlers
290    for handler in logger.handlers:
291        logger.removeHandler(handler)
292
293    folder = AIRBYTE_LOGGING_ROOT / connector_name
294    folder.mkdir(parents=True, exist_ok=True)
295
296    # Create a file handler
297    global_logger = get_global_file_logger()
298    logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log"
299    logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}"
300    print(logfile_msg)
301    if global_logger:
302        global_logger.info(logfile_msg)
303
304    file_handler = logging.FileHandler(logfile_path)
305    file_handler.setLevel(logging.INFO)
306
307    if AIRBYTE_STRUCTURED_LOGGING:
308        # Create a formatter and set it for the handler
309        formatter = logging.Formatter("%(message)s")
310        file_handler.setFormatter(formatter)
311
312        # Add the file handler to the logger
313        logger.addHandler(file_handler)
314
315        # Configure structlog
316        structlog.configure(
317            processors=[
318                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
319                structlog.stdlib.add_log_level,
320                structlog.stdlib.PositionalArgumentsFormatter(),
321                structlog.processors.StackInfoRenderer(),
322                structlog.processors.format_exc_info,
323                structlog.processors.JSONRenderer(),
324            ],
325            context_class=dict,
326            logger_factory=structlog.stdlib.LoggerFactory(),
327            wrapper_class=structlog.stdlib.BoundLogger,
328            cache_logger_on_first_use=True,
329        )
330
331        # Create a logger
332        return structlog.get_logger(f"airbyte.{connector_name}")
333
334    # Else, write logs in plain text
335
336    file_handler.setFormatter(
337        logging.Formatter(
338            fmt="%(asctime)s - %(levelname)s - %(message)s",
339            datefmt="%Y-%m-%d %H:%M:%S",
340        )
341    )
342
343    logger.addHandler(file_handler)
344    return logger
AIRBYTE_STRUCTURED_LOGGING: bool = False

Whether to enable structured logging.

This value is read from the AIRBYTE_STRUCTURED_LOGGING environment variable. If the variable is not set, the default value is False.

def warn_once( message: str, logger: logging.Logger | None = None, *, with_stack: int | bool) -> None:
48def warn_once(
49    message: str,
50    logger: logging.Logger | None = None,
51    *,
52    with_stack: int | bool,
53) -> None:
54    """Emit a warning message only once.
55
56    This function is a wrapper around the `warnings.warn` function that logs the warning message
57    to the global logger. The warning message is only emitted once per unique message.
58    """
59    if message in _warned_messages:
60        return
61
62    if not with_stack:
63        stacklevel = 0
64    if with_stack is True:
65        stacklevel = 2
66
67    _warned_messages.add(message)
68    warnings.warn(
69        message,
70        category=UserWarning,
71        stacklevel=stacklevel,
72    )
73
74    if logger:
75        logger.warning(message)

Emit a warning message only once.

This function is a wrapper around the warnings.warn function that logs the warning message to the global logger. The warning message is only emitted once per unique message.

AIRBYTE_LOGGING_ROOT: pathlib.Path | None = PosixPath('/tmp/airbyte/logs')

The root directory for Airbyte logs.

This value can be overridden by setting the AIRBYTE_LOGGING_ROOT environment variable.

If not provided, PyAirbyte will use /tmp/airbyte/logs/ where /tmp/ is the OS's default temporary directory. If the directory cannot be created, PyAirbyte will log a warning and set this value to None.

@lru_cache
def get_global_file_logger() -> logging.Logger | None:
123@lru_cache
124def get_global_file_logger() -> logging.Logger | None:
125    """Return the global logger for PyAirbyte.
126
127    This logger is configured to write logs to the console and to a file in the log directory.
128    """
129    logger = logging.getLogger("airbyte")
130    logger.setLevel(logging.INFO)
131    logger.propagate = False
132
133    if AIRBYTE_LOGGING_ROOT is None:
134        # No temp directory available, so return None
135        return None
136
137    # Else, configure the logger to write to a file
138
139    # Remove any existing handlers
140    for handler in logger.handlers:
141        logger.removeHandler(handler)
142
143    yyyy_mm_dd: str = pendulum.now().format("YYYY-MM-DD")
144    folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd
145    try:
146        folder.mkdir(parents=True, exist_ok=True)
147    except Exception:
148        warn_once(
149            f"Failed to create logging directory at '{folder!s}'.",
150            with_stack=False,
151        )
152        return None
153
154    logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log"
155    print(f"Writing PyAirbyte logs to file: {logfile_path!s}")
156
157    file_handler = logging.FileHandler(
158        filename=logfile_path,
159        encoding="utf-8",
160    )
161
162    if AIRBYTE_STRUCTURED_LOGGING:
163        # Create a formatter and set it for the handler
164        formatter = logging.Formatter("%(message)s")
165        file_handler.setFormatter(formatter)
166
167        # Add the file handler to the logger
168        logger.addHandler(file_handler)
169
170        # Configure structlog
171        structlog.configure(
172            processors=[
173                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
174                structlog.stdlib.add_log_level,
175                structlog.stdlib.PositionalArgumentsFormatter(),
176                structlog.processors.StackInfoRenderer(),
177                structlog.processors.format_exc_info,
178                structlog.processors.JSONRenderer(),
179            ],
180            context_class=dict,
181            logger_factory=structlog.stdlib.LoggerFactory(),
182            wrapper_class=structlog.stdlib.BoundLogger,
183            cache_logger_on_first_use=True,
184        )
185
186        # Create a logger
187        return structlog.get_logger("airbyte")
188
189    # Create and configure file handler
190    file_handler.setFormatter(
191        logging.Formatter(
192            fmt="%(asctime)s - %(levelname)s - %(message)s",
193            datefmt="%Y-%m-%d %H:%M:%S",
194        )
195    )
196
197    logger.addHandler(file_handler)
198    return logger

Return the global logger for PyAirbyte.

This logger is configured to write logs to the console and to a file in the log directory.

def get_global_stats_log_path() -> pathlib.Path | None:
201def get_global_stats_log_path() -> Path | None:
202    """Return the path to the performance log file."""
203    if AIRBYTE_LOGGING_ROOT is None:
204        return None
205
206    folder = AIRBYTE_LOGGING_ROOT
207    try:
208        folder.mkdir(parents=True, exist_ok=True)
209    except Exception:
210        warn_once(
211            f"Failed to create logging directory at '{folder!s}'.",
212            with_stack=False,
213        )
214        return None
215
216    return folder / "airbyte-stats.log"

Return the path to the performance log file.

@lru_cache
def get_global_stats_logger() -> structlog._generic.BoundLogger:
219@lru_cache
220def get_global_stats_logger() -> structlog.BoundLogger:
221    """Create a stats logger for performance metrics."""
222    logger = logging.getLogger("airbyte.stats")
223    logger.setLevel(logging.INFO)
224    logger.propagate = False
225
226    # Configure structlog
227    structlog.configure(
228        processors=[
229            structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
230            structlog.stdlib.PositionalArgumentsFormatter(),
231            structlog.processors.JSONRenderer(),
232        ],
233        context_class=dict,
234        logger_factory=structlog.stdlib.LoggerFactory(),
235        wrapper_class=structlog.stdlib.BoundLogger,
236        cache_logger_on_first_use=True,
237    )
238
239    logfile_path: Path | None = get_global_stats_log_path()
240    if AIRBYTE_LOGGING_ROOT is None or logfile_path is None:
241        # No temp directory available, so return no-op logger without handlers
242        return structlog.get_logger("airbyte.stats")
243
244    print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}")
245
246    # Remove any existing handlers
247    for handler in logger.handlers:
248        logger.removeHandler(handler)
249
250    folder = AIRBYTE_LOGGING_ROOT
251    try:
252        folder.mkdir(parents=True, exist_ok=True)
253    except Exception:
254        warn_once(
255            f"Failed to create logging directory at '{folder!s}'.",
256            with_stack=False,
257        )
258        return structlog.get_logger("airbyte.stats")
259
260    file_handler = logging.FileHandler(
261        filename=logfile_path,
262        encoding="utf-8",
263    )
264
265    # Create a formatter and set it for the handler
266    formatter = logging.Formatter("%(message)s")
267    file_handler.setFormatter(formatter)
268
269    # Add the file handler to the logger
270    logger.addHandler(file_handler)
271
272    # Create a logger
273    return structlog.get_logger("airbyte.stats")

Create a stats logger for performance metrics.

def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
276def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
277    """Create a logger from logging module."""
278    logger = logging.getLogger(f"airbyte.{connector_name}")
279    logger.setLevel(logging.INFO)
280
281    # Prevent logging to stderr by stopping propagation to the root logger
282    logger.propagate = False
283
284    if AIRBYTE_LOGGING_ROOT is None:
285        # No temp directory available, so return a basic logger
286        return logger
287
288    # Else, configure the logger to write to a file
289
290    # Remove any existing handlers
291    for handler in logger.handlers:
292        logger.removeHandler(handler)
293
294    folder = AIRBYTE_LOGGING_ROOT / connector_name
295    folder.mkdir(parents=True, exist_ok=True)
296
297    # Create a file handler
298    global_logger = get_global_file_logger()
299    logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log"
300    logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}"
301    print(logfile_msg)
302    if global_logger:
303        global_logger.info(logfile_msg)
304
305    file_handler = logging.FileHandler(logfile_path)
306    file_handler.setLevel(logging.INFO)
307
308    if AIRBYTE_STRUCTURED_LOGGING:
309        # Create a formatter and set it for the handler
310        formatter = logging.Formatter("%(message)s")
311        file_handler.setFormatter(formatter)
312
313        # Add the file handler to the logger
314        logger.addHandler(file_handler)
315
316        # Configure structlog
317        structlog.configure(
318            processors=[
319                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
320                structlog.stdlib.add_log_level,
321                structlog.stdlib.PositionalArgumentsFormatter(),
322                structlog.processors.StackInfoRenderer(),
323                structlog.processors.format_exc_info,
324                structlog.processors.JSONRenderer(),
325            ],
326            context_class=dict,
327            logger_factory=structlog.stdlib.LoggerFactory(),
328            wrapper_class=structlog.stdlib.BoundLogger,
329            cache_logger_on_first_use=True,
330        )
331
332        # Create a logger
333        return structlog.get_logger(f"airbyte.{connector_name}")
334
335    # Else, write logs in plain text
336
337    file_handler.setFormatter(
338        logging.Formatter(
339            fmt="%(asctime)s - %(levelname)s - %(message)s",
340            datefmt="%Y-%m-%d %H:%M:%S",
341        )
342    )
343
344    logger.addHandler(file_handler)
345    return logger

Create a logger from logging module.