数据核对小工具-升级优化版


背景介绍:

前面有两篇随笔,介绍了比较两个Excel中指定列异同的工具及实现代码

最近工作较以往不同

以往比较的是客户的Excel 与 自研系统导出的Excel 中指定列的异同,两个Excel的格式是不同的

现在我需要比较的是客户核对过的生产环境的数据  与 测试环境用变更后的全新逻辑新跑出的数据  进行比对

两个Excel格式相同

看似问题简化了

但是这次我需要比较的不止是一列,而是多列

于是对以前的工具进行了升级

顺道进行了封装

使其可扩展性更强

先直接贴一下代码,备注还没来得及写(有问题可留言,我有时间时会回复的)

目前需要核对的列我先只写了两列,大家可以继续扩展。

import xlrd
import xlwt
import xlutils
import time
from datetime import datetime
from xlrd import xldate_as_tuple
from xlutils import copy


class Solution:

    def __init__(self, ori_path, tar_path, dict_res = {}, success = 0, fail = 0, other = 0):

        self.ori_path = ori_path
        self.tar_path = tar_path
        self.dict_res = {}
        self.success = 0 
        self.fail = 0
        self.other = 0

    def readExcel(self, index_ori, index_tar):

        wb_ori = xlrd.open_workbook(self.ori_path)
        wb_tar = xlrd.open_workbook(self.tar_path)

        sheet_ori = wb_ori.sheet_by_index(index_ori)
        sheet_tar = wb_tar.sheet_by_index(index_tar)

        return sheet_ori,sheet_tar

    def getDict(self, sheet, check_col1_ori, check_col2_ori):
        dict_check = {}

        for row_ori in range(1,sheet.nrows):
            proId_ori = sheet.cell_value(row_ori, 1)
            proCode_ori = sheet.cell_value(row_ori, 5)
            sdate_ori= sheet.cell_value(row_ori, 8)
            edate_ori = sheet.cell_value(row_ori,9)
            key_ori_list = [proId_ori, proCode_ori, sdate_ori, edate_ori]
            key = '--'.join(key_ori_list)

            chkcol1_ori = str(sheet.cell_value(row_ori,check_col1_ori))
            chkcol2_ori = sheet.cell_value(row_ori,check_col2_ori)
            val_ori_list = [chkcol1_ori,chkcol2_ori]
            # print(val_ori_list)
            val_ori = '--'.join(val_ori_list)

            dict_check[key] = val_ori

        return dict_check

    def check(self, dict_ori, dict_tar):
        try:
            for key in dict_ori.keys():
                if key in dict_tar.keys():
                    print(dict_ori.get(key))
                    print(dict_tar.get(key))
                    if dict_ori.get(key) == dict_tar.get(key):
                        self.success += 1
                        res_str = '核对无误'
                        self.dict_res[key] = res_str
                    else:
                        self.fail += 1
                        # diff = dict_ori.get(key) - dict_tar.get(key)
                        # # print(diff)
                        # dict_ori[key] = diff
                        value_ori = dict_ori.get(key)
                        thisIncome_ori = float(value_ori.split('--')[0])
                        # print('*'*100)
                        # print(type(thisIncome_ori))
                        businessType_ori = value_ori.split('--')[1]
                        
                        value_tar = dict_tar.get(key)
                        thisIncome_tar = float(value_tar.split('--')[0])
                        # print('*'*100)
                        # print(type(thisIncome_tar))
                        businessType_tar = value_tar.split('--')[1]
                        
                        # diff_thisIncome = str(round((thisIncome_ori - thisIncome_tar), 2))
                        # diff_businessType = businessType_ori + 'VS' +businessType_tar
                        # diff_list = [diff_thisIncome, diff_businessType]
                        # diff = '--'.join(diff_list)
                        # # print(diff)
                        if thisIncome_ori != thisIncome_tar and businessType_ori == businessType_tar:
                            diff_thisIncome = str(round((thisIncome_ori - thisIncome_tar), 2))
                            diff_businessType = '业务类型核对一致'
                        elif thisIncome_ori == thisIncome_tar and businessType_ori != businessType_tar:
                            diff_thisIncome = '本期收益核对一致'
                            diff_businessType = '业务类型不一致:' + businessType_ori + 'VS' +businessType_tar
                        else:
                            diff_thisIncome = str(round((thisIncome_ori - thisIncome_tar), 2))
                            diff_businessType = '业务类型不一致:' + businessType_ori + 'VS' +businessType_tar

                        diff_list = [diff_thisIncome, diff_businessType]
                        diff = '--'.join(diff_list)

                        self.dict_res[key] = diff
                else:
                    self.other += 1
                    res_str = '待核对文件数据缺失,请检查'
                    self.dict_res[key] = res_str

            self.num = self.success + self.fail + self.other
            now = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
            print('{}完成核对! 共核对:{}条, 其中核对无误:{}, 有差异:{}, 其他数据异常:{}'.format(now, self.num, self.success, self.fail, self.other))

            return self.dict_res
        except Exception as error:
            print(str(error))
        

    def writeExcel(self, dict_res, wb_res_path):
        wb_res = xlwt.Workbook()
        sheet_res = wb_res.add_sheet('result')
        title_list = ['序号','金融产品编码','产品名称','本期起息日','本期结息日','本期收益_核对结果','业务类型_核对结果']
        for i in range(0,len(title_list)):
            sheet_res.write(0,i,title_list[i])


        # print(dict_res)
        if dict_res:
            row_res = 1
            for key, value in dict_res.items():

                product_id = key.split('--')[0]
                product_code = key.split('--')[1]
                value_start_date = key.split('--')[2]
                value_end_date = key.split('--')[3]
                sheet_res.write(row_res, 0, row_res)
                sheet_res.write(row_res, 1, product_id)
                sheet_res.write(row_res, 2, product_code)
                sheet_res.write(row_res, 3, value_start_date)
                sheet_res.write(row_res, 4, value_end_date)

                check_res = value
                if check_res == '核对无误' or check_res == '待核对文件数据缺失,请检查':
                    sheet_res.write(row_res, 5, check_res)
                    sheet_res.write(row_res, 6, check_res)
                else:
                    diff_thisIncome = value.split('--')[0]
                    diff_businessType = value.split('--')[1]
                    sheet_res.write(row_res, 5 ,diff_thisIncome)
                    sheet_res.write(row_res, 6, diff_businessType)

                row_res +=1
        else:
            return '待写入数据异常'
        # wwb_res.save('result_this_income_20200713.xlsx')
        wb_res.save(wb_res_path)



if __name__ == '__main__':
    ori_path = 'ori20200713001.xlsx'
    tar_path = 'tar20200713002.xlsx'
    case = Solution(ori_path,tar_path)
    sheet_ori, sheet_tar = case.readExcel(0,0)
    print(sheet_ori)
    print(sheet_tar)
    # for i in range(0,sheet_ori.nrows):
    #     print (sheet_ori.row_values(i))
    # for j in range(0,sheet_tar.nrows):
    #     print (sheet_tar.row_values(j))
    dict_ori = case.getDict(sheet_ori, 12, 2)
    print(dict_ori)
    dict_tar = case.getDict(sheet_tar, 12, 2)
    print(dict_tar)
    res5 = case.check(dict_ori, dict_tar)
    print(res5)
    wb_res_path1= 'result_thisIncome&businessType_20200714.xlsx'
    case.writeExcel(res5,wb_res_path1)

demo跑出来的数据如下:

希望真正跑8000+数据的时候 不会出现太多问题。