Python异常检测以及报警实现


# -*- encoding: utf-8 -*- ''' @File    :   launcher.py @Time    :   2021/11/16 17:55:00 @Author  :   peng.wu @Version :   1.0 @Desc    :   3sigma异常值检测 '''
# here put the import lib import numpy as np import pandas as pd from matplotlib import pyplot as plt import seaborn as sns import requests from DingDingBot import DDBOT
def load_data(cols:list, filter: str):     """ 加载数据 """     df = pd.read_csv("data\detail.csv", encoding="utf-8")     p_col = ["stat_day", "stat_hours", "app_channel", "pv", "ids_uv", "users_uv"]     df.columns = p_col     df = df[df["app_channel"] == filter]     return df[cols]
def sigma_stat(N: int):     """ N is k sigma """
    # 预处理数据     cols = ["stat_day", "stat_hours", "pv"]     init_data = load_data(cols, "灵锡")     init_data["stat_hours"] = init_data["stat_hours"].map(str)     init_data["date"] = init_data["stat_day"].str.cat(init_data["stat_hours"], sep=" ")         # 模型初始化     data_y = init_data["pv"]     data_x = init_data["date"]
    ymean = np.mean(data_y)     ystd = np.std(data_y)     threshold1 = ymean - N * ystd     threshold2 = ymean + N * ystd     print("pv平均值:{0}, 标准差:{1}, sigma-min:{2}, sigma-max:{3}".format(ymean, ystd, threshold1, threshold2))
    # 将异常值保存     outlier = []     outlier_x = []     for i in range(0, len(data_y)):         if (data_y.iloc[i] < threshold1 or data_y.iloc[i] > threshold2):             outlier.append(data_y.iloc[i])             outlier_x.append(data_x.iloc[i])         else:             continue
        result = zip(outlier_x, outlier)     print("\n异常数据如下:\n")     for item in result:         print(item)
    return data_y
def quantile_stat(data_x, data_y):     """ 分位数异常检测 """
    # sns.boxplot(data_y) 箱线图     # 计算上四分位数和下四分位数     q1 = data_y.quantile(0.25)     q3 = data_y.quantile(0.75)
    low_outlier = q1 - 1.5 * (q3 - q1)     high_outlier = q3 + 1.5 * (q3 - q1)
    # 检测异常点     q_outlier = []     q_outlier_x = []     for i in range(0, len(data_y)):         if (data_y.iloc[i] < lower_outlier or data_y.iloc[i] > high_outlier):             q_outlier.append(data_y.iloc[i])             q_outlier_x.append(data_x.iloc[i])         else:             continue
def send_msg():     """ 钉钉异常检测报警 """     pass    
if __name__ == '__main__':     pass

相关