airbyte.logs

PyAirbyte Logging features and related configuration.

By default, PyAirbyte main logs are written to a file in the AIRBYTE_LOGGING_ROOT directory, which defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log files within the same directory, under a subfolder with the name of the connector.

PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured logging in JSON, set AIRBYTE_STRUCTURED_LOGGING to True.

  1# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
  2"""PyAirbyte Logging features and related configuration.
  3
  4By default, PyAirbyte main logs are written to a file in the `AIRBYTE_LOGGING_ROOT` directory, which
  5defaults to a system-created temporary directory. PyAirbyte also maintains connector-specific log
  6files within the same directory, under a subfolder with the name of the connector.
  7
  8PyAirbyte supports structured JSON logging, which is disabled by default. To enable structured
  9logging in JSON, set `AIRBYTE_STRUCTURED_LOGGING` to `True`.
 10"""
 11
 12from __future__ import annotations
 13
 14import logging
 15import os
 16import platform
 17import tempfile
 18import warnings
 19from functools import lru_cache
 20from pathlib import Path
 21
 22import structlog
 23import ulid
 24
 25from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
 26
 27
 28def _str_to_bool(value: str) -> bool:
 29    """Convert a string value of an environment values to a boolean value."""
 30    return bool(value) and value.lower() not in {"", "0", "false", "f", "no", "n", "off"}
 31
 32
 33AIRBYTE_STRUCTURED_LOGGING: bool = _str_to_bool(
 34    os.getenv(
 35        key="AIRBYTE_STRUCTURED_LOGGING",
 36        default="false",
 37    )
 38)
 39"""Whether to enable structured logging.
 40
 41This value is read from the `AIRBYTE_STRUCTURED_LOGGING` environment variable. If the variable is
 42not set, the default value is `False`.
 43"""
 44
 45_warned_messages: set[str] = set()
 46
 47
 48def warn_once(
 49    message: str,
 50    logger: logging.Logger | None = None,
 51    *,
 52    with_stack: int | bool,
 53) -> None:
 54    """Emit a warning message only once.
 55
 56    This function is a wrapper around the `warnings.warn` function that logs the warning message
 57    to the global logger. The warning message is only emitted once per unique message.
 58    """
 59    if message in _warned_messages:
 60        return
 61
 62    if not with_stack:
 63        stacklevel = 0
 64
 65    if with_stack is True:
 66        stacklevel = 2
 67
 68    _warned_messages.add(message)
 69    warnings.warn(
 70        message,
 71        category=UserWarning,
 72        stacklevel=stacklevel,
 73    )
 74
 75    if logger:
 76        logger.warning(message)
 77
 78
 79def _get_logging_root() -> Path | None:
 80    """Return the root directory for logs.
 81
 82    Returns `None` if no valid path can be found.
 83
 84    This is the directory where logs are stored.
 85    """
 86    if "AIRBYTE_LOGGING_ROOT" in os.environ:
 87        log_root = Path(os.environ["AIRBYTE_LOGGING_ROOT"])
 88    elif platform.system() == "Darwin" or platform.system() == "Linux":
 89        # Use /tmp on macOS and Linux
 90        log_root = Path("/tmp") / "airbyte" / "logs"
 91    else:
 92        # Use the default temp directory on Windows or any other OS
 93        log_root = Path(tempfile.gettempdir()) / "airbyte" / "logs"
 94
 95    try:
 96        # Attempt to create the log root directory if it does not exist
 97        log_root.mkdir(parents=True, exist_ok=True)
 98    except OSError:
 99        # Handle the error by returning None
100        warn_once(
101            (
102                f"Failed to create PyAirbyte logging directory at `{log_root}`. "
103                "You can override the default path by setting the `AIRBYTE_LOGGING_ROOT` "
104                "environment variable."
105            ),
106            with_stack=False,
107        )
108        return None
109    else:
110        return log_root
111
112
113AIRBYTE_LOGGING_ROOT: Path | None = _get_logging_root()
114"""The root directory for Airbyte logs.
115
116This value can be overridden by setting the `AIRBYTE_LOGGING_ROOT` environment variable.
117
118If not provided, PyAirbyte will use `/tmp/airbyte/logs/` where `/tmp/` is the OS's default
119temporary directory. If the directory cannot be created, PyAirbyte will log a warning and
120set this value to `None`.
121"""
122
123
124@lru_cache
125def get_global_file_logger() -> logging.Logger | None:
126    """Return the global logger for PyAirbyte.
127
128    This logger is configured to write logs to the console and to a file in the log directory.
129    """
130    logger = logging.getLogger("airbyte")
131    logger.setLevel(logging.INFO)
132    logger.propagate = False
133
134    if AIRBYTE_LOGGING_ROOT is None:
135        # No temp directory available, so return None
136        return None
137
138    # Else, configure the logger to write to a file
139
140    # Remove any existing handlers
141    for handler in logger.handlers:
142        logger.removeHandler(handler)
143
144    yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d")
145    folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd
146    try:
147        folder.mkdir(parents=True, exist_ok=True)
148    except Exception:
149        warn_once(
150            f"Failed to create logging directory at '{folder!s}'.",
151            with_stack=False,
152        )
153        return None
154
155    logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log"
156    print(f"Writing PyAirbyte logs to file: {logfile_path!s}")
157
158    file_handler = logging.FileHandler(
159        filename=logfile_path,
160        encoding="utf-8",
161    )
162
163    if AIRBYTE_STRUCTURED_LOGGING:
164        # Create a formatter and set it for the handler
165        formatter = logging.Formatter("%(message)s")
166        file_handler.setFormatter(formatter)
167
168        # Add the file handler to the logger
169        logger.addHandler(file_handler)
170
171        # Configure structlog
172        structlog.configure(
173            processors=[
174                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
175                structlog.stdlib.add_log_level,
176                structlog.stdlib.PositionalArgumentsFormatter(),
177                structlog.processors.StackInfoRenderer(),
178                structlog.processors.format_exc_info,
179                structlog.processors.JSONRenderer(),
180            ],
181            context_class=dict,
182            logger_factory=structlog.stdlib.LoggerFactory(),
183            wrapper_class=structlog.stdlib.BoundLogger,
184            cache_logger_on_first_use=True,
185        )
186
187        # Create a logger
188        return structlog.get_logger("airbyte")
189
190    # Create and configure file handler
191    file_handler.setFormatter(
192        logging.Formatter(
193            fmt="%(asctime)s - %(levelname)s - %(message)s",
194            datefmt="%Y-%m-%d %H:%M:%S",
195        )
196    )
197
198    logger.addHandler(file_handler)
199    return logger
200
201
202def get_global_stats_log_path() -> Path | None:
203    """Return the path to the performance log file."""
204    if AIRBYTE_LOGGING_ROOT is None:
205        return None
206
207    folder = AIRBYTE_LOGGING_ROOT
208    try:
209        folder.mkdir(parents=True, exist_ok=True)
210    except Exception:
211        warn_once(
212            f"Failed to create logging directory at '{folder!s}'.",
213            with_stack=False,
214        )
215        return None
216
217    return folder / "airbyte-stats.log"
218
219
220@lru_cache
221def get_global_stats_logger() -> structlog.BoundLogger:
222    """Create a stats logger for performance metrics."""
223    logger = logging.getLogger("airbyte.stats")
224    logger.setLevel(logging.INFO)
225    logger.propagate = False
226
227    # Configure structlog
228    structlog.configure(
229        processors=[
230            structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
231            structlog.stdlib.PositionalArgumentsFormatter(),
232            structlog.processors.JSONRenderer(),
233        ],
234        context_class=dict,
235        logger_factory=structlog.stdlib.LoggerFactory(),
236        wrapper_class=structlog.stdlib.BoundLogger,
237        cache_logger_on_first_use=True,
238    )
239
240    logfile_path: Path | None = get_global_stats_log_path()
241    if AIRBYTE_LOGGING_ROOT is None or logfile_path is None:
242        # No temp directory available, so return no-op logger without handlers
243        return structlog.get_logger("airbyte.stats")
244
245    print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}")
246
247    # Remove any existing handlers
248    for handler in logger.handlers:
249        logger.removeHandler(handler)
250
251    folder = AIRBYTE_LOGGING_ROOT
252    try:
253        folder.mkdir(parents=True, exist_ok=True)
254    except Exception:
255        warn_once(
256            f"Failed to create logging directory at '{folder!s}'.",
257            with_stack=False,
258        )
259        return structlog.get_logger("airbyte.stats")
260
261    file_handler = logging.FileHandler(
262        filename=logfile_path,
263        encoding="utf-8",
264    )
265
266    # Create a formatter and set it for the handler
267    formatter = logging.Formatter("%(message)s")
268    file_handler.setFormatter(formatter)
269
270    # Add the file handler to the logger
271    logger.addHandler(file_handler)
272
273    # Create a logger
274    return structlog.get_logger("airbyte.stats")
275
276
277def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
278    """Create a logger from logging module."""
279    logger = logging.getLogger(f"airbyte.{connector_name}")
280    logger.setLevel(logging.INFO)
281
282    # Prevent logging to stderr by stopping propagation to the root logger
283    logger.propagate = False
284
285    if AIRBYTE_LOGGING_ROOT is None:
286        # No temp directory available, so return a basic logger
287        return logger
288
289    # Else, configure the logger to write to a file
290
291    # Remove any existing handlers
292    for handler in logger.handlers:
293        logger.removeHandler(handler)
294
295    folder = AIRBYTE_LOGGING_ROOT / connector_name
296    folder.mkdir(parents=True, exist_ok=True)
297
298    # Create a file handler
299    global_logger = get_global_file_logger()
300    logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log"
301    logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}"
302    print(logfile_msg)
303    if global_logger:
304        global_logger.info(logfile_msg)
305
306    file_handler = logging.FileHandler(logfile_path)
307    file_handler.setLevel(logging.INFO)
308
309    if AIRBYTE_STRUCTURED_LOGGING:
310        # Create a formatter and set it for the handler
311        formatter = logging.Formatter("%(message)s")
312        file_handler.setFormatter(formatter)
313
314        # Add the file handler to the logger
315        logger.addHandler(file_handler)
316
317        # Configure structlog
318        structlog.configure(
319            processors=[
320                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
321                structlog.stdlib.add_log_level,
322                structlog.stdlib.PositionalArgumentsFormatter(),
323                structlog.processors.StackInfoRenderer(),
324                structlog.processors.format_exc_info,
325                structlog.processors.JSONRenderer(),
326            ],
327            context_class=dict,
328            logger_factory=structlog.stdlib.LoggerFactory(),
329            wrapper_class=structlog.stdlib.BoundLogger,
330            cache_logger_on_first_use=True,
331        )
332
333        # Create a logger
334        return structlog.get_logger(f"airbyte.{connector_name}")
335
336    # Else, write logs in plain text
337
338    file_handler.setFormatter(
339        logging.Formatter(
340            fmt="%(asctime)s - %(levelname)s - %(message)s",
341            datefmt="%Y-%m-%d %H:%M:%S",
342        )
343    )
344
345    logger.addHandler(file_handler)
346    return logger
AIRBYTE_STRUCTURED_LOGGING: bool = False

Whether to enable structured logging.

This value is read from the AIRBYTE_STRUCTURED_LOGGING environment variable. If the variable is not set, the default value is False.

def warn_once( message: str, logger: logging.Logger | None = None, *, with_stack: int | bool) -> None:
49def warn_once(
50    message: str,
51    logger: logging.Logger | None = None,
52    *,
53    with_stack: int | bool,
54) -> None:
55    """Emit a warning message only once.
56
57    This function is a wrapper around the `warnings.warn` function that logs the warning message
58    to the global logger. The warning message is only emitted once per unique message.
59    """
60    if message in _warned_messages:
61        return
62
63    if not with_stack:
64        stacklevel = 0
65
66    if with_stack is True:
67        stacklevel = 2
68
69    _warned_messages.add(message)
70    warnings.warn(
71        message,
72        category=UserWarning,
73        stacklevel=stacklevel,
74    )
75
76    if logger:
77        logger.warning(message)

Emit a warning message only once.

This function is a wrapper around the warnings.warn function that logs the warning message to the global logger. The warning message is only emitted once per unique message.

AIRBYTE_LOGGING_ROOT: pathlib.Path | None = PosixPath('/tmp/airbyte/logs')

The root directory for Airbyte logs.

This value can be overridden by setting the AIRBYTE_LOGGING_ROOT environment variable.

If not provided, PyAirbyte will use /tmp/airbyte/logs/ where /tmp/ is the OS's default temporary directory. If the directory cannot be created, PyAirbyte will log a warning and set this value to None.

@lru_cache
def get_global_file_logger() -> logging.Logger | None:
125@lru_cache
126def get_global_file_logger() -> logging.Logger | None:
127    """Return the global logger for PyAirbyte.
128
129    This logger is configured to write logs to the console and to a file in the log directory.
130    """
131    logger = logging.getLogger("airbyte")
132    logger.setLevel(logging.INFO)
133    logger.propagate = False
134
135    if AIRBYTE_LOGGING_ROOT is None:
136        # No temp directory available, so return None
137        return None
138
139    # Else, configure the logger to write to a file
140
141    # Remove any existing handlers
142    for handler in logger.handlers:
143        logger.removeHandler(handler)
144
145    yyyy_mm_dd: str = ab_datetime_now().strftime("%Y-%m-%d")
146    folder = AIRBYTE_LOGGING_ROOT / yyyy_mm_dd
147    try:
148        folder.mkdir(parents=True, exist_ok=True)
149    except Exception:
150        warn_once(
151            f"Failed to create logging directory at '{folder!s}'.",
152            with_stack=False,
153        )
154        return None
155
156    logfile_path = folder / f"airbyte-log-{str(ulid.ULID())[2:11]}.log"
157    print(f"Writing PyAirbyte logs to file: {logfile_path!s}")
158
159    file_handler = logging.FileHandler(
160        filename=logfile_path,
161        encoding="utf-8",
162    )
163
164    if AIRBYTE_STRUCTURED_LOGGING:
165        # Create a formatter and set it for the handler
166        formatter = logging.Formatter("%(message)s")
167        file_handler.setFormatter(formatter)
168
169        # Add the file handler to the logger
170        logger.addHandler(file_handler)
171
172        # Configure structlog
173        structlog.configure(
174            processors=[
175                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
176                structlog.stdlib.add_log_level,
177                structlog.stdlib.PositionalArgumentsFormatter(),
178                structlog.processors.StackInfoRenderer(),
179                structlog.processors.format_exc_info,
180                structlog.processors.JSONRenderer(),
181            ],
182            context_class=dict,
183            logger_factory=structlog.stdlib.LoggerFactory(),
184            wrapper_class=structlog.stdlib.BoundLogger,
185            cache_logger_on_first_use=True,
186        )
187
188        # Create a logger
189        return structlog.get_logger("airbyte")
190
191    # Create and configure file handler
192    file_handler.setFormatter(
193        logging.Formatter(
194            fmt="%(asctime)s - %(levelname)s - %(message)s",
195            datefmt="%Y-%m-%d %H:%M:%S",
196        )
197    )
198
199    logger.addHandler(file_handler)
200    return logger

Return the global logger for PyAirbyte.

This logger is configured to write logs to the console and to a file in the log directory.

def get_global_stats_log_path() -> pathlib.Path | None:
203def get_global_stats_log_path() -> Path | None:
204    """Return the path to the performance log file."""
205    if AIRBYTE_LOGGING_ROOT is None:
206        return None
207
208    folder = AIRBYTE_LOGGING_ROOT
209    try:
210        folder.mkdir(parents=True, exist_ok=True)
211    except Exception:
212        warn_once(
213            f"Failed to create logging directory at '{folder!s}'.",
214            with_stack=False,
215        )
216        return None
217
218    return folder / "airbyte-stats.log"

Return the path to the performance log file.

@lru_cache
def get_global_stats_logger() -> structlog._generic.BoundLogger:
221@lru_cache
222def get_global_stats_logger() -> structlog.BoundLogger:
223    """Create a stats logger for performance metrics."""
224    logger = logging.getLogger("airbyte.stats")
225    logger.setLevel(logging.INFO)
226    logger.propagate = False
227
228    # Configure structlog
229    structlog.configure(
230        processors=[
231            structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
232            structlog.stdlib.PositionalArgumentsFormatter(),
233            structlog.processors.JSONRenderer(),
234        ],
235        context_class=dict,
236        logger_factory=structlog.stdlib.LoggerFactory(),
237        wrapper_class=structlog.stdlib.BoundLogger,
238        cache_logger_on_first_use=True,
239    )
240
241    logfile_path: Path | None = get_global_stats_log_path()
242    if AIRBYTE_LOGGING_ROOT is None or logfile_path is None:
243        # No temp directory available, so return no-op logger without handlers
244        return structlog.get_logger("airbyte.stats")
245
246    print(f"Writing PyAirbyte performance stats to file: {logfile_path!s}")
247
248    # Remove any existing handlers
249    for handler in logger.handlers:
250        logger.removeHandler(handler)
251
252    folder = AIRBYTE_LOGGING_ROOT
253    try:
254        folder.mkdir(parents=True, exist_ok=True)
255    except Exception:
256        warn_once(
257            f"Failed to create logging directory at '{folder!s}'.",
258            with_stack=False,
259        )
260        return structlog.get_logger("airbyte.stats")
261
262    file_handler = logging.FileHandler(
263        filename=logfile_path,
264        encoding="utf-8",
265    )
266
267    # Create a formatter and set it for the handler
268    formatter = logging.Formatter("%(message)s")
269    file_handler.setFormatter(formatter)
270
271    # Add the file handler to the logger
272    logger.addHandler(file_handler)
273
274    # Create a logger
275    return structlog.get_logger("airbyte.stats")

Create a stats logger for performance metrics.

def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
278def new_passthrough_file_logger(connector_name: str) -> logging.Logger:
279    """Create a logger from logging module."""
280    logger = logging.getLogger(f"airbyte.{connector_name}")
281    logger.setLevel(logging.INFO)
282
283    # Prevent logging to stderr by stopping propagation to the root logger
284    logger.propagate = False
285
286    if AIRBYTE_LOGGING_ROOT is None:
287        # No temp directory available, so return a basic logger
288        return logger
289
290    # Else, configure the logger to write to a file
291
292    # Remove any existing handlers
293    for handler in logger.handlers:
294        logger.removeHandler(handler)
295
296    folder = AIRBYTE_LOGGING_ROOT / connector_name
297    folder.mkdir(parents=True, exist_ok=True)
298
299    # Create a file handler
300    global_logger = get_global_file_logger()
301    logfile_path = folder / f"{connector_name}-log-{str(ulid.ULID())[2:11]}.log"
302    logfile_msg = f"Writing `{connector_name}` logs to file: {logfile_path!s}"
303    print(logfile_msg)
304    if global_logger:
305        global_logger.info(logfile_msg)
306
307    file_handler = logging.FileHandler(logfile_path)
308    file_handler.setLevel(logging.INFO)
309
310    if AIRBYTE_STRUCTURED_LOGGING:
311        # Create a formatter and set it for the handler
312        formatter = logging.Formatter("%(message)s")
313        file_handler.setFormatter(formatter)
314
315        # Add the file handler to the logger
316        logger.addHandler(file_handler)
317
318        # Configure structlog
319        structlog.configure(
320            processors=[
321                structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
322                structlog.stdlib.add_log_level,
323                structlog.stdlib.PositionalArgumentsFormatter(),
324                structlog.processors.StackInfoRenderer(),
325                structlog.processors.format_exc_info,
326                structlog.processors.JSONRenderer(),
327            ],
328            context_class=dict,
329            logger_factory=structlog.stdlib.LoggerFactory(),
330            wrapper_class=structlog.stdlib.BoundLogger,
331            cache_logger_on_first_use=True,
332        )
333
334        # Create a logger
335        return structlog.get_logger(f"airbyte.{connector_name}")
336
337    # Else, write logs in plain text
338
339    file_handler.setFormatter(
340        logging.Formatter(
341            fmt="%(asctime)s - %(levelname)s - %(message)s",
342            datefmt="%Y-%m-%d %H:%M:%S",
343        )
344    )
345
346    logger.addHandler(file_handler)
347    return logger

Create a logger from logging module.