python筛选关键字---error


初始版本

过滤错误日志,将错误日志写入过滤日志文件中;

import os
import json
import logging
import re

name = '日志文件名'
path = '日志路径'
# 拼接要过滤日志文件的路径
file_path = os.path.join(path, name)
# 拼接要输出日志文件的路径
LogPath = os.path.join(path, "输出日志名")
# 日志输出等级(DEBUG,INFO,ERROR...)如果需要修改日志格式,加format='%(message)s'就可以改成没有info:root
logging.basicConfig(filename=LogPath, level=logging.DEBUG) 
# 清空日志文件内容
file = open(LogPath, 'w').close()

# 输入json格式自定义类型
class Employee(object):
    def __init__(self, SYS_NO, O_TIME, ACTION, LOG_TYPE, RESULT, SPENT_TIME, ERROR_MSG):
        self.SYS_NO = SYS_NO
        self.O_TIME = O_TIME
        self.ACTION = ACTION
        self.LOG_TYPE = LOG_TYPE
        self.RESULT = RESULT
        self.SPENT_TIME = SPENT_TIME
        self.ERROR_MSG = ERROR_MSG

    def obj_json(self, obj_instance):
        return {
            'SYS_NO': obj_instance.SYS_NO,
            'O_TIME': obj_instance.O_TIME,
            'ACTION': obj_instance.ACTION,
            'LOG_TYPE': obj_instance.LOG_TYPE,
            'RESULT': obj_instance.RESULT,
            'SPENT_TIME': obj_instance.SPENT_TIME,
            'ERROR_MSG': obj_instance.ERROR_MSG,
        }

# 读取日志文件,然后写入err_log文件
def read_log(url, keyword):
    with open(file_path, 'r+', encoding='utf-8') as f:
        count = 0
        for line in f:   # 如果需要读取行号数字,改成 for (num,line) in enumerate(f):  这里就将行号获取到:num,需要展示的话,在自定义json中加入NUM对应的字段
            if keyword in line:    
                # 输出日志切割,多符号切分
                test1 = re.split(',| ', line)
                time = test1[0] + ' ' + test1[1] + ':' + test1[2]
                sy_no = None
                act = None
                log_type = test1[3]
                res = test1[4]
                spent_time = None
                # error_msg = test1[5] + '' +test1[6]+ '' +test1[7]+ '' +test1[8]
                # 判断字段长度,防止报错,也可以try...except...
                if len(test1) <= 8:
                    error_msg = os.path.join(test1[5], test1[6], test1[7])
                else:
                    error_msg = os.path.join(test1[5], test1[6], test1[7], test1[8])
                # 调用函数,传入对应字段
                emp = Employee(sy_no, time, act, log_type, res, spent_time, error_msg)
                # 序列化转换为json格式
                data = json.dumps(emp, default=emp.obj_json, ensure_ascii=False)
                print(data)
                # 日志输出
                logging.info(data)

            count += line.count(keyword)
    return count


sum = 0
num = read_log(file_path, 'ERROR')
sum += num
print('关键字总个数: ' + str(sum))
 

updata 版本

新增了对文件实时监测筛选错误日志打印;

新增对上报接口http请求调用;

import os
import json
import logging
import re
import requests
import time

name = '日志文件名'
path = '日志路径'
file_path = os.path.join(path, name)
LogPath = os.path.join(path, "输出日志名")
logging.basicConfig(filename=LogPath, level=logging.DEBUG,format='%(message)s')
# 向http发送请求地址
url_post = 'http请求接口地址'

file = open(LogPath, 'w').close()

last_data_err_time = ''

# json格式自定义类型
class Employee(object):
    def __init__(self, SYS_NO, O_TIME, ACTION, LOG_TYPE, RESULT, SPENT_TIME, ERROR_MSG):
        self.SYS_NO = SYS_NO
        self.O_TIME = O_TIME
        self.ACTION = ACTION
        self.LOG_TYPE = LOG_TYPE
        self.RESULT = RESULT
        self.SPENT_TIME = SPENT_TIME
        self.ERROR_MSG = ERROR_MSG

    def obj_json(self, obj_instance):
        return {
            'SYS_NO': obj_instance.SYS_NO,
            'O_TIME': obj_instance.O_TIME,
            'ACTION': obj_instance.ACTION,
            'LOG_TYPE': obj_instance.LOG_TYPE,
            'RESULT': obj_instance.RESULT,
            'SPENT_TIME': obj_instance.SPENT_TIME,
            'ERROR_MSG': obj_instance.ERROR_MSG,
        }


# 读取日志文件,然后写入log文件
def read_log(url, keyword):
    with open(file_path, 'r+', encoding='utf-8') as f:
        count = 0
        for line in f:
            if keyword in line:
                # 输出日志切割
                test1 = re.split(',| ', line)
                time = test1[0] + ' ' + test1[1] + ':' + test1[2]
                sy_no = None
                act = None
                log_type = test1[3]
                res = test1[4]
                spent_time = None
                # error_msg = test1[5] + '' +test1[6]+ '' +test1[7]+ '' +test1[8]
                # 判断字段长度,防止报错,也可以try...except...
                if len(test1) <= 8:
                    error_msg = os.path.join(test1[5], test1[6], test1[7])
                elif len(test1) <= 9:
                    error_msg = os.path.join(test1[5], test1[6], test1[7], test1[8])
                elif len(test1) <= 10:
                    error_msg = os.path.join(test1[5], test1[6], test1[7], test1[8],test1[9])
                else:
                    error_msg = os.path.join(test1[5], test1[6], test1[7], test1[8],test1[9],test1[10])
                # 调用函数,传入对应字段
                emp = Employee(sy_no, time, act, log_type, res, spent_time, error_msg)
                # 序列化转换为json格式
                data = json.dumps(emp, default=emp.obj_json, ensure_ascii=False)
                # print(data)
                # res_data = requests.post(url_post, data=json.dumps(data),timeout=2)
                # print(res_data)
                # 日志输出
                logging.info(data)

            count += line.count(keyword)
            # print(last_data_err_time)
    return count

# 获取错误日志中最后一行数据
def last_data(file_path,LogPath):
    res_err_data = read_log(file_path, 'ERROR')
    with open(LogPath,'r+') as f_out:
        out_lines = f_out.readlines()
        last_err = out_lines[-1]

        # 判断当前日志是否存在,如果存在则不报送
        global last_data_err_time
        if len(last_data_err_time) == 0:
            last_data_err_time = json.loads(last_err)["O_TIME"]
            return last_err
        elif len(last_data_err_time) != 0:
            if last_data_err_time != json.loads(last_err)["O_TIME"]:
                last_data_err_time = json.loads(last_err)["O_TIME"]
                res_data = requests.post(url_post, data=json.dumps(last_err),timeout=2)
                return last_err
            else:
                return '暂无新报错'

while 1:
    time.sleep(600)
    res = last_data(file_path,LogPath)
    print(res)