(转)Python 如何仅用5000 行代码,实现强大的 logging 模块?


原文:https://juejin.cn/post/6945682794853072909

Python 的 logging 模块实现了灵活的日志系统。整个模块仅仅 3 个类,不到 5000 行代码的样子,学习它可以加深对程序日志的了解,本文分下面几个部分:

  • logging 简介

  • logging API 设计

  • 记录器对象 Logger

  • 日志记录对象 LogRecord

  • 处理器对象 Hander

  • 格式器对象 Formatter

  • 滚动日志文件处理器

  • 小结

  • 小技巧

logging 简介

本次代码使用的是 python 3.8.5 的版本,官方中文文档 3.8.8 。参考链接中官方中文文档非常详细,建议先看一遍了解日志使用。

我们主要研究日志如何输出到标准窗口这一主线;日志的配置,日志的线程安全及各种特别的Handler等支线可以先忽略。

logging API 设计

先看看日志使用:

import logging

logging.basicConfig(level=logging.INFO, format='%(levelname)-8s %(name)-10s %(asctime)s %(message)s')
lang = {"name": "python", "age":20}
logging.info('This is a info message %s', lang)
logging.debug('This is a debug message')
logging.warning('This is a warning message')

logger = logging.getLogger(__name__)
logger.warning('This is a warning')
复制代码

输出内容如下:

INFO     root       2021-03-04 00:03:53,473 This is a info message {'name': 'python', 'age': 20}
WARNING  root       2021-03-04 00:03:53,473 This is a warning message
WARNING  __main__   2021-03-04 00:03:53,473 This is a warning
复制代码

可以看到 logging 的使用非常方便,模块直接提供了一组API。

root = RootLogger(WARNING)  # 默认提供的logger
Logger.root = root
Logger.manager = Manager(Logger.root)

def debug(msg, *args, **kwargs): # info,warning等api类似
    if len(root.handlers) == 0:
        basicConfig()  # 默认配置
    root.debug(msg, *args, **kwargs)

def getLogger(name=None):
    if name:
        return Logger.manager.getLogger(name)  # 创建特定的logger
    else:
        return root  # 返回默认的logger
复制代码

这种API的提供方式,我们在requests中也有看到。api中很重要的设置config的方式:

def basicConfig(**kwargs):
    ...
    if handlers is None:
        filename = kwargs.pop("filename", None)
        mode = kwargs.pop("filemode", 'a')
        if filename:
            h = FileHandler(filename, mode)
        else:
            stream = kwargs.pop("stream", None)
            h = StreamHandler(stream)  # 默认的handler
        handlers = [h]
    dfs = kwargs.pop("datefmt", None)
    style = kwargs.pop("style", '%')
    fs = kwargs.pop("format", _STYLES[style][1])
    fmt = Formatter(fs, dfs, style)  # 生成formatter
    for h in handlers:
        if h.formatter is None:
            h.setFormatter(fmt)
        root.addHandler(h)  # 设置root的handler
    level = kwargs.pop("level", None)
    if level is not None:
        root.setLevel(level)  # 设置日志级别
复制代码

可以看到,日志的配置主要包括下面几项:

  • level 日志级别
  • format 信息格式化模版
  • filename 输出到文件
  • datefmt %Y-%m-%d %H:%M:%S,uuu 时间的格式模版
  • style [%,{,$] 格式样板

演示代码输出中,可以看到debug日志没有显示,是因为 debug < info:

CRITICAL = 50
FATAL = CRITICAL
ERROR = 40
WARNING = 30
WARN = WARNING
INFO = 20
DEBUG = 10
NOTSET = 0
复制代码

记录器对象 Logger

查看Logger之前,先看logger对象的管理类Manager

_loggerClass = Logger

class Manager(object):
    def __init__(self, rootnode):
        self.root = rootnode
        self.disable = 0
        self.loggerDict = {}  # 所有日志记录对象的字典
    ...
    def getLogger(self, name):
        rv = None
        if name in self.loggerDict:
            rv = self.loggerDict[name]  # 获取已经创建过的同名logger
            ...
        else:
            rv = (self.loggerClass or _loggerClass)(name)  # 创建新的logger
            rv.manager = self
            self.loggerDict[name] = rv
            ...
        return rv
复制代码

日志过滤器

class Filterer(object):

    def __init__(self):
        self.filters = []

    def addFilter(self, filter):
        self.filters.append(filter)

    def removeFilter(self, filter):
        self.filters.remove(filter)

    def filter(self, record):
        rv = True
        for f in self.filters:  # 过滤日志
            if hasattr(f, 'filter'):
                result = f.filter(record)
            else:
                result = f(record) # assume callable - will raise if not
            if not result:
                rv = False
                break
        return r
复制代码

核心的 Logger 实际上只是一个控制中心:

class Logger(Filterer):  # logger可以过滤日志
    def __init__(self, name, level=NOTSET):
        Filterer.__init__(self)
        self.name = name
        self.level = _checkLevel(level)
        self.parent = None  # 日志可以有层级
        self.propagate = True
        self.handlers = []  # 可以输出到多个handler
        self.disabled = False  # 可以关闭
        self._cache = {}

    def debug(self, msg, *args, **kwargs):  # 输出debug日志
        if self.isEnabledFor(DEBUG):
            self._log(DEBUG, msg, args, **kwargs)
复制代码

logger可以判断日志级别:

def isEnabledFor(self, level):
    if self.disabled:
        return False

    try:
        return self._cache[level]
    except KeyError:
        try:
            if self.manager.disable >= level:
                is_enabled = self._cache[level] = False
            else:
                is_enabled = self._cache[level] = (
                    level >= self.getEffectiveLevel()
                )
        return is_enabled

def getEffectiveLevel(self):
    logger = self
    while logger:
        if logger.level:
            return logger.level
        logger = logger.parent
    return NOTSET
复制代码

日志输出:

def _log(self, level, msg, args, exc_info=None, extra=None, stack_info=False,
         stacklevel=1):
    ...
    fn, lno, func = "(unknown file)", 0, "(unknown function)"
    ...
    # 生成日志记录
    record = self.makeRecord(self.name, level, fn, lno, msg, args,
                             exc_info, func, extra, sinfo)
    # 使用handler处理日志
    self.handle(record)
复制代码

日志记录的生产,就是创建一个LogRecord对象:

_logRecordFactory = LogRecord

def makeRecord(self, name, level, fn, lno, msg, args, exc_info,
               func=None, extra=None, sinfo=None):
    ...
    rv = _logRecordFactory(name, level, fn, lno, msg, args, exc_info, func,
                         sinfo)
    ...
    return rv
复制代码

使用logger对象的所有handler处理日志:

def handle(self, record):
    c = self
    found = 0
    while c:
        for hdlr in c.handlers:  # 使用所有的handler处理日志
            found = found + 1
            if record.levelno >= hdlr.level:
                hdlr.handle(record)
复制代码

root-logger 的handler是在config中配置的:

def basicConfig(**kwargs):
    ...
    root.addHandler(h)  # 设置root的handler
复制代码

日志记录对象 LogRecord

日志记录对象非常简单:

class LogRecord(object):
    def __init__(self, name, level, pathname, lineno,
                 msg, args, exc_info, func=None, sinfo=None, **kwargs):
        ct = time.time()
        self.name = name  # logger名称
        self.msg = msg  # 日志标识信息
        ...
        self.args = args  # 变量
        self.levelname = getLevelName(level)
        ...

    def getMessage(self):
        msg = str(self.msg)
        if self.args:
            msg = msg % self.args  # 格式化消息
        return msg
复制代码

处理器对象 Hander

顶级Handler定义了Handler的模版方法

class Handler(Filterer):  # 处理器也可以过滤日志
    def __init__(self, level=NOTSET):
        Filterer.__init__(self)
        self._name = None
        self.level = _checkLevel(level)  # handler也有日志级别
        self.formatter = None
        _addHandlerRef(self)
        self.createLock()

    def handle(self, record):  # 处理日志
        rv = self.filter(record)  # 过滤日志
        if rv:
            self.acquire()  # 申请锁
            try:
                self.emit(record)  # 提交记录,由不同子类实现 
            finally:
                self.release()  # 释放锁
        return rv
复制代码

默认的console流 StreamHandler

class StreamHandler(Handler):

    terminator = '\n'  # 自动换行

    def __init__(self, stream=None):
        Handler.__init__(self)
        if stream is None:
            stream = sys.stderr  # 默认使用stderr输出
        self.stream = stream

    def emit(self, record):
        try:
            msg = self.format(record)  # 格式化日志记录
            stream = self.stream
            stream.write(msg + self.terminator)  # 写日志
            self.flush()  # 刷新写缓存
        except Exception:
            ...

    def format(self, record):
        if self.formatter:
            fmt = self.formatter
        else:
            fmt = _defaultFormatter
        return fmt.format(record)  # 使用格式化器格式化日志记录
复制代码

为什么使用stderr,可以看下面的测试中的输出都是到console:

print("haha")
print("fatal error", file=sys.stderr)
sys.stderr.write("fatal error\n")
复制代码

格式器对象 Formatter

格式化器主要使用Formatter和Style实现

class Formatter(object):
    def __init__(self, fmt=None, datefmt=None, style='%', validate=True):
        self._style = _STYLES[style][0](fmt)
        self._fmt = self._style._fmt
        self.datefmt = datefmt

    def format(self, record):
        record.message = record.getMessage()
        s = self.formatMessage(record)
        return s

    def formatMessage(self, record):
        return self._style.format(record)  # 格式化
复制代码

Style类

class PercentStyle(object):

    default_format = '%(message)s'
    asctime_format = '%(asctime)s'
    asctime_search = '%(asctime)'
    validation_pattern = re.compile(r'%\(\w+\)[#0+ -]*(\*|\d+)?(\.(\*|\d+))?[diouxefgcrsa%]', re.I)

    def __init__(self, fmt):
        self._fmt = fmt or self.default_format

    def usesTime(self):
        return self._fmt.find(self.asctime_search) >= 0

    def validate(self):
        """Validate the input format, ensure it matches the correct style"""
        if not self.validation_pattern.search(self._fmt):
            raise ValueError("Invalid format '%s' for '%s' style" % (self._fmt, self.default_format[0]))

    def _format(self, record):
        return self._fmt % record.__dict__  # 格式化日志记录对象

    def format(self, record):
        try:
            return self._format(record)
        except KeyError as e:
            raise ValueError('Formatting field not found in record: %s' % e)
复制代码

滚动日志文件处理器

线上的日志持续输出到一个文件的话,会让文件巨大,即有加剧了丢失的风险,也难以处理。通常有按照大小滚动或者按照日期滚动的方法,这个功能非常重要。先看滚动日志处理器模版:

class BaseRotatingHandler(logging.FileHandler):
    def emit(self, record):
        try:
            if self.shouldRollover(record): # 判断是否需要滚动
                self.doRollover()  # 滚动日志
            logging.FileHandler.emit(self, record)  # 输出日志
        except Exception:
            self.handleError(record)

    def rotate(self, source, dest):
        if not callable(self.rotator):
            if os.path.exists(source):
                os.rename(source, dest)  # 重命名日志文件
        else:
            self.rotator(source, dest)
复制代码

按大小滚动 RotatingFileHandler

按照文件大小滚动的处理器:

class RotatingFileHandler(BaseRotatingHandler):

    def __init__(self, filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=False):
        if maxBytes > 0:
            mode = 'a'
        BaseRotatingHandler.__init__(self, filename, mode, encoding, delay)
        self.maxBytes = maxBytes  # 单个文件大小上限
        self.backupCount = backupCount  # 日志备份数量

    def doRollover(self):  # 执行滚动
        if self.stream:
            self.stream.close()  # 关闭当前的流
            self.stream = None
        if self.backupCount > 0:
            for i in range(self.backupCount - 1, 0, -1):
                sfn = self.rotation_filename("%s.%d" % (self.baseFilename, i))
                dfn = self.rotation_filename("%s.%d" % (self.baseFilename,
                                                        i + 1))
                if os.path.exists(sfn):
                    if os.path.exists(dfn):
                        os.remove(dfn)
                    os.rename(sfn, dfn)
            dfn = self.rotation_filename(self.baseFilename + ".1")
            if os.path.exists(dfn):
                os.remove(dfn)
            self.rotate(self.baseFilename, dfn)  # 重命名文件
        if not self.delay:
            self.stream = self._open()  # 如果shouldRollover延迟,可以打开新的流

    def shouldRollover(self, record):  # 判断是否需要滚动
        if self.stream is None:  # 立即打开流
            self.stream = self._open()
        if self.maxBytes > 0:   
            msg = "%s\n" % self.format(record)
            self.stream.seek(0, 2)  #due to non-posix-compliant Windows feature
            if self.stream.tell() + len(msg) >= self.maxBytes:  # 判断大小
                return 1
        return 0
复制代码

文件大小滚动就是在记录日志时候判断文档是否超过上限,超过则重命名旧日志,生成新日志。

按照日期滚动 TimedRotatingFileHandler

按照日期滚动的处理器:

class TimedRotatingFileHandler(BaseRotatingHandler):
    def __init__(self, filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False, atTime=None):
        BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)
        self.when = when.upper()
        self.backupCount = backupCount
        self.utc = utc
        self.atTime = atTime
        # 日期设置,支持多种方式
        if self.when == 'S':
            self.interval = 1 # one second
            self.suffix = "%Y-%m-%d_%H-%M-%S"
            self.extMatch = r"^\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2}(\.\w+)?$"
        ...

        self.extMatch = re.compile(self.extMatch, re.ASCII)
        self.interval = self.interval * interval # multiply by units requested
        filename = self.baseFilename
        if os.path.exists(filename):
            t = os.stat(filename)[ST_MTIME]  # 最后修改时间
        else:
            t = int(time.time())
        self.rolloverAt = self.computeRollover(t)  # 提前计算终止时间

    def computeRollover(self, currentTime):
        # 判断的方法还是很长很复杂的,先pass

    def shouldRollover(self, record):
        t = int(time.time())
        if t >= self.rolloverAt:  # 判断是否到期
            return 1
        return 0

    def doRollover(self):
        ...
        dfn = self.rotation_filename(self.baseFilename + "." +
                                     time.strftime(self.suffix, timeTuple))
        #  滚动日志文件
        if os.path.exists(dfn):
            os.remove(dfn)
        self.rotate(self.baseFilename, dfn)
        if self.backupCount > 0:
            for s in self.getFilesToDelete():
                os.remove(s)
        ...
        # 计算下一个时间点
        newRolloverAt = self.computeRollover(currentTime)
        ...
        self.rolloverAt = newRolloverAt
复制代码

日期滚动就是计算最后时间点,超过时间点则重新生成新的日志文件。

小结

logging的处理逻辑大概是这样的:

  • 创建Logger对象,提供API,用来接收应用程序日志
  • Logger对象包括多个Handler
  • 每个Handler有一个Formatter对象
  • 每条日志都会生成一个LogRecord对象
  • 使用不同的Handler对象将LogRecored对象提交到不同的流
  • 每个日志对象通过Formatter格式化输出
  • 可以使用按日期/文件大小的方式进行日志文件的滚动记录

小技巧

覆盖对象的 reduce 方法,让对象支持reduce函数:

class RootLogger(Logger):
    def __init__(self, level):
        Logger.__init__(self, "root", level)

    def __reduce__(self):
        return getLogger, ()
复制代码

线程锁的创建和释放:

_lock = threading.RLock()

def _acquireLock():
    if _lock:
        _lock.acquire()

def _releaseLock():
    if _lock:
        _lock.release()
复制代码

线程锁的使用:

def addHandler(self, hdlr):
    _acquireLock()
    try:
        self.handlers.append(hdlr)
    finally:
        _releaseLock()

def removeHandler(self, hdlr):
    _acquireLock()
    try:
        self.handlers.remove(hdlr)
    finally:
        _releaseLock()