数据核对小工具-升级优化版
背景介绍:
前面有两篇随笔,介绍了比较两个Excel中指定列异同的工具及实现代码
最近工作较以往不同
以往比较的是客户的Excel 与 自研系统导出的Excel 中指定列的异同,两个Excel的格式是不同的
现在我需要比较的是客户核对过的生产环境的数据 与 测试环境用变更后的全新逻辑新跑出的数据 进行比对
两个Excel格式相同
看似问题简化了
但是这次我需要比较的不止是一列,而是多列
于是对以前的工具进行了升级
顺道进行了封装
使其可扩展性更强
先直接贴一下代码,备注还没来得及写(有问题可留言,我有时间时会回复的)
目前需要核对的列我先只写了两列,大家可以继续扩展。
import xlrd import xlwt import xlutils import time from datetime import datetime from xlrd import xldate_as_tuple from xlutils import copy class Solution: def __init__(self, ori_path, tar_path, dict_res = {}, success = 0, fail = 0, other = 0): self.ori_path = ori_path self.tar_path = tar_path self.dict_res = {} self.success = 0 self.fail = 0 self.other = 0 def readExcel(self, index_ori, index_tar): wb_ori = xlrd.open_workbook(self.ori_path) wb_tar = xlrd.open_workbook(self.tar_path) sheet_ori = wb_ori.sheet_by_index(index_ori) sheet_tar = wb_tar.sheet_by_index(index_tar) return sheet_ori,sheet_tar def getDict(self, sheet, check_col1_ori, check_col2_ori): dict_check = {} for row_ori in range(1,sheet.nrows): proId_ori = sheet.cell_value(row_ori, 1) proCode_ori = sheet.cell_value(row_ori, 5) sdate_ori= sheet.cell_value(row_ori, 8) edate_ori = sheet.cell_value(row_ori,9) key_ori_list = [proId_ori, proCode_ori, sdate_ori, edate_ori] key = '--'.join(key_ori_list) chkcol1_ori = str(sheet.cell_value(row_ori,check_col1_ori)) chkcol2_ori = sheet.cell_value(row_ori,check_col2_ori) val_ori_list = [chkcol1_ori,chkcol2_ori] # print(val_ori_list) val_ori = '--'.join(val_ori_list) dict_check[key] = val_ori return dict_check def check(self, dict_ori, dict_tar): try: for key in dict_ori.keys(): if key in dict_tar.keys(): print(dict_ori.get(key)) print(dict_tar.get(key)) if dict_ori.get(key) == dict_tar.get(key): self.success += 1 res_str = '核对无误' self.dict_res[key] = res_str else: self.fail += 1 # diff = dict_ori.get(key) - dict_tar.get(key) # # print(diff) # dict_ori[key] = diff value_ori = dict_ori.get(key) thisIncome_ori = float(value_ori.split('--')[0]) # print('*'*100) # print(type(thisIncome_ori)) businessType_ori = value_ori.split('--')[1] value_tar = dict_tar.get(key) thisIncome_tar = float(value_tar.split('--')[0]) # print('*'*100) # print(type(thisIncome_tar)) businessType_tar = value_tar.split('--')[1] # diff_thisIncome = str(round((thisIncome_ori - thisIncome_tar), 2)) # diff_businessType = businessType_ori + 'VS' +businessType_tar # diff_list = [diff_thisIncome, diff_businessType] # diff = '--'.join(diff_list) # # print(diff) if thisIncome_ori != thisIncome_tar and businessType_ori == businessType_tar: diff_thisIncome = str(round((thisIncome_ori - thisIncome_tar), 2)) diff_businessType = '业务类型核对一致' elif thisIncome_ori == thisIncome_tar and businessType_ori != businessType_tar: diff_thisIncome = '本期收益核对一致' diff_businessType = '业务类型不一致:' + businessType_ori + 'VS' +businessType_tar else: diff_thisIncome = str(round((thisIncome_ori - thisIncome_tar), 2)) diff_businessType = '业务类型不一致:' + businessType_ori + 'VS' +businessType_tar diff_list = [diff_thisIncome, diff_businessType] diff = '--'.join(diff_list) self.dict_res[key] = diff else: self.other += 1 res_str = '待核对文件数据缺失,请检查' self.dict_res[key] = res_str self.num = self.success + self.fail + self.other now = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime()) print('{}完成核对! 共核对:{}条, 其中核对无误:{}, 有差异:{}, 其他数据异常:{}'.format(now, self.num, self.success, self.fail, self.other)) return self.dict_res except Exception as error: print(str(error)) def writeExcel(self, dict_res, wb_res_path): wb_res = xlwt.Workbook() sheet_res = wb_res.add_sheet('result') title_list = ['序号','金融产品编码','产品名称','本期起息日','本期结息日','本期收益_核对结果','业务类型_核对结果'] for i in range(0,len(title_list)): sheet_res.write(0,i,title_list[i]) # print(dict_res) if dict_res: row_res = 1 for key, value in dict_res.items(): product_id = key.split('--')[0] product_code = key.split('--')[1] value_start_date = key.split('--')[2] value_end_date = key.split('--')[3] sheet_res.write(row_res, 0, row_res) sheet_res.write(row_res, 1, product_id) sheet_res.write(row_res, 2, product_code) sheet_res.write(row_res, 3, value_start_date) sheet_res.write(row_res, 4, value_end_date) check_res = value if check_res == '核对无误' or check_res == '待核对文件数据缺失,请检查': sheet_res.write(row_res, 5, check_res) sheet_res.write(row_res, 6, check_res) else: diff_thisIncome = value.split('--')[0] diff_businessType = value.split('--')[1] sheet_res.write(row_res, 5 ,diff_thisIncome) sheet_res.write(row_res, 6, diff_businessType) row_res +=1 else: return '待写入数据异常' # wwb_res.save('result_this_income_20200713.xlsx') wb_res.save(wb_res_path) if __name__ == '__main__': ori_path = 'ori20200713001.xlsx' tar_path = 'tar20200713002.xlsx' case = Solution(ori_path,tar_path) sheet_ori, sheet_tar = case.readExcel(0,0) print(sheet_ori) print(sheet_tar) # for i in range(0,sheet_ori.nrows): # print (sheet_ori.row_values(i)) # for j in range(0,sheet_tar.nrows): # print (sheet_tar.row_values(j)) dict_ori = case.getDict(sheet_ori, 12, 2) print(dict_ori) dict_tar = case.getDict(sheet_tar, 12, 2) print(dict_tar) res5 = case.check(dict_ori, dict_tar) print(res5) wb_res_path1= 'result_thisIncome&businessType_20200714.xlsx' case.writeExcel(res5,wb_res_path1)
demo跑出来的数据如下:
希望真正跑8000+数据的时候 不会出现太多问题。