python+requests+excel+unittest+ddt接口自动化数据驱动并生成html报告


1.环境准备

  • python3.6
  • requests
  • xlrd

2.实现功能

  • 封装requests请求方法
  • 在excel维护测试数据
  • 用unittest+ddt数据驱动模式执行
  • 引入个性化测试报告
  • 实现数据依赖,响应数据的提取,请求数据的更新

3.项目结构

4.文件路径配置

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import os,sys,requests,json
#去掉文件名,返回目录
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
sys.path.append(BASE_DIR)
# 配置文件,os.path.join(path1[, path2[, ...]])将多个路径组合后返回
TEST_CONFIG =  os.path.join(BASE_DIR,"database","config.ini")
# 测试用例模板文件
SOURCE_FILE = os.path.join(BASE_DIR,"database","DemoAPITestCase.xlsx")
#提取字段文件
TEST_JSON =  os.path.join(BASE_DIR,"database","user.json")
# excel测试用例结果文件
TARGET_FILE = os.path.join(BASE_DIR,"report","excelReport","DemoAPITestCase.xlsx")
# 测试用例报告
TEST_REPORT = os.path.join(BASE_DIR,"report")
# 测试用例程序文件
TEST_CASE = os.path.join(BASE_DIR,"testcase")

5.随机数据的封装

备注:在实际接口测试中部分字段要做去重,比如新增用户信息,身份证字段做了去重,所有每次提交时身份证字段用随机数据代替

#!/user/bin/env python
#-*- coding:utf-8-*-
#获取随机数
import string,random,datetime
class RandomNumber:
    #这是一个随机生成邮箱,身份证号,手机号的类
    def mobile(self):
        """随机生成手机号"""
        mobile_head = ['134', '135', '188', '185', '172', '152']
        mobile_tail=''.join(random.sample(string.digits,8))
        mobile=random.choice(mobile_head)+mobile_tail
        return mobile
    def email(self):
        """随机生成邮箱"""
        h_email=['@qq.com','@163.com','@199.com']
        email_head = str(random.randint(111111, 999999))
        user_email=email_head+random.choice(h_email)
        return user_email
    def idCard(self):
        """随机生成身份证号"""
        icard_head = str(random.randint(111111,999999))
        icard_tail = str(random.randint(11111,99999))
        icard = icard_head+ str(1993020)+icard_tail
        return icard
    def randomNumber(self):
        """随机随机数"""
        randomData=str(random.randint(11111111, 99999999))
        return randomData
    def getRandomDate(self,random_name):
        if random_name=="mobile":
            randoms =self.mobile()
        elif random_name=="email":
            randoms =self.email()
        elif random_name=="idCard":
            randoms=self.idCard()
        else:
            randoms =self.randomNumber()
        return randoms

6.操作dict的数据

备注:通过操作dict数据实现接口返回数据的提取,接口请求头和请求body的更新

from lib.operationJson import operetionJson
from lib.getRandomData import RandomNumber
import re
class openrationDict:
    """
    这是一个解析dict参数的类,
    可用于多参数的指定key,指定key集合解析key,更新指定key的值
    """
    def __init__(self):
        self.oj=operetionJson()
        self.rn=RandomNumber()
    def get_value(self,my_dict,key):
        """
        这是一个递归函数
        :param my_dict: 传入的字典
        :param key: 字典中的key
        :return:返回字典中某个key对应的值
        """
        try:
            if isinstance(my_dict,dict):
                if my_dict.get(key)or my_dict.get(key)==0 or my_dict.get(key)==''\
                        and my_dict.get(key) is False:
                    return my_dict.get(key)
                for my_dict_key in my_dict:
                    if self.get_value(my_dict.get(my_dict_key),key)or\
                        self.get_value(my_dict.get(my_dict_key),key)is False:
                        return self.get_value(my_dict.get(my_dict_key),key)
            if isinstance(my_dict,list):
                for my_dict_arr in my_dict:
                    if self.get_value(my_dict_arr,key)\
                        or self.get_value(my_dict_arr,key)is False:
                        return self.get_value(my_dict_arr,key)
        except Exception as el:
            print(el)
    def get_response_date(self,res,extract_data):
        """
        通过响应数据中的key,获取value并写入json文件
        :param res: 接口返回数据
        :param extract_data: 要提取的数据
        :return:返回一个字典
        """
        if isinstance(extract_data,str):
            extract_data=eval(extract_data)
        for key,extract_content in extract_data .items():
            global write_dict
            key_list=[]
            value_list=[]
            if key=="extractBody":
                for key ,value in extract_content.items():
                    key_list.append(value)
                    value_list.append(self.get_value(res,key))
                    print("获取提取字段:{0},提取值:{1}".format(key, self.get_value(res,key)))
                    write_dict = dict(zip(key_list, value_list))
                    print("写入.json文件成功,写入字段:{0},写入值:{1}".format(value, self.get_value(res, key)))
                self.oj.write_data(write_dict)#写入json文件
    def update_request_data(self,api_dict):
        """

        :param api_dict:原有请求入参
        :return:更新的的入参
        """
        extract_dict = re.findall(r"[$]{(.+?)}", str(api_dict))
        if len(extract_dict) > 0:
            for key in range(len(extract_dict)):
                extract_dict_key = extract_dict[key]
                old = "${" + extract_dict_key + "}"
                if "random" in extract_dict_key:
                    random_id = extract_dict_key.split("_")[1]
                    new = self.rn.getRandomDate(random_id)
                    api_dict = eval(str(api_dict).replace(old, str(new)))
                else:
                    new = self.oj.get_data(extract_dict_key)
                    api_dict = eval(str(api_dict).replace(old, str(new)))
            return api_dict

7.操作json文件

备注:通过操作json数据实现从json数据的读取和写入

#!/user/bin/env python
#-*- coding:utf-8-*-
import json,os,sys
from config import setting
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
class operetionJson:
    def read_data(self):
        self.file_path = setting.TEST_JSON
        try:
            with open(self.file_path, "r", encoding='utf-8') as json_file:
                data = json.load(json_file)
                json_file.close()
                return data
        except Exception as error:
            return"读取json文件异常"
    def get_data(self,id):
        """
        根据关键字获取json文件中数据
        :param id:json文件中对应的key
        :return: json文件中key对应的value
        """

        try:
            self.data = self.read_data()
            return self.data[id]
        except Exception as error:
            return "根据关键字:{}获取json文件中数据value失败,请检查json文件中是否存在key={}的字段".format(id,id)
    def write_data(self,data):
        """
        将字典写入json文件中
        :param data:
        :return:
        """
        try:
            json_data=self.read_data()
            json_data.update(data)
            with open(self.file_path,'w') as fp:
                fp.write(json.dumps(json_data))
        except Exception as  error:
            return "文件写入失败,报错信息为:{}".format(error)

8.操作excel文件

备注:通过操作excel文件,获取测试数据

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import xlrd,json,re
from config import setting
from lib.getRandomData import RandomNumber
class ReadExcel():
    """读取excel文件数据"""
    def __init__(self,fileName=None, sheet_id=None):
        self.rn=RandomNumber()
        if fileName:
            self.fileName=fileName
            self.sheet_id=sheet_id
        else:
            sheet_id=0
            fileName=setting.SOURCE_FILE
        self.data = xlrd.open_workbook(fileName)
        self.table = self.data.sheet_by_index(sheet_id)
        # 获取总行数、总列数
        self.nrows = self.table.nrows
        self.ncols = self.table.ncols
    def read_data(self):
        if self.nrows > 1:
            # 获取第一行的内容,列表格式
            keys = self.table.row_values(0)
            listApiData = []
            # 获取每一行的内容,列表格式
            for col in range(1, self.nrows):
                values = self.table.row_values(col)
                # keys,values组合转换为字典
                api_dict = dict(zip(keys, values))
                # print(api_dict)
                if api_dict['is_run']=="yes":
                    listApiData.append(api_dict)
                    # print(api_dict)
            return listApiData
        else:
            print("表格是空数据!")
            return None

9.封装requests方法

#!/usr/bin/env python
# _*_ coding:utf-8 _*_
import os,sys,json,requests
sys.path.append(os.path.dirname(os.path.dirname(__file__)))
class RunMethod:
    def __init__(self):
        # 实例化一个叫做session类,让session发送get,或者post等请求
        self.send = requests.session()
    def PostMethod(self,url,body,header):
        res = self.send.post(url=url, json=body, headers=header,verify=False)
        return res
    def GetMethod(self,url,body,header):
        res = self.send.get(url=url, params=body, headers=header,verify=False)
        return res
    def PutMethod(self,url,body,header):
        res = requests.put(url=url, data=body, headers=header,verify=False)
        return res
    def DeleteMethod(self,url,body,header):
        res = requests.delete(url=url, params=body,headers=header, verify=False)
        return res
    def run_main(self,method,url,body,headers):
        try:
            if method=="post":
                res= self.PostMethod(url,body,headers)
            elif method=="get":
                res=self.GetMethod(url,body,headers)
            elif method=="put":
                res=self.PutMethod(url,body,headers)
            else:
                res=self.DeleteMethod(url,body,headers)
            return res
        except Exception as  error:
            return error

10.测试数据的维护

11.unittest+ddt编写测试用例

 1 #!/usr/bin/env python
 2 # _*_ coding:utf-8 _*_
 3 import re,sys,warnings,urllib3,os
 4 sys.path.append(os.path.dirname(os.path.dirname(__file__)))
 5 import unittest,requests,ddt,json
 6 from lib.readExcel import ReadExcel
 7 from lib.run_method import RunMethod
 8 from lib.operationDict import openrationDict
 9 from lib.operationJson import operetionJson
10 testData = ReadExcel().read_data()
11 @ddt.ddt
12 class Demo_API(unittest.TestCase):
13     def setUp(self):
14         self.op_dict=openrationDict()
15         self.op_json=operetionJson()
16     @ddt.data(*testData)
17     def test_api(self,data):
18         #通过warnings库来忽略掉相关告警,去掉提示信息ResourceWarning...
19         warnings.simplefilter("ignore", ResourceWarning)
20         if re.findall(r"[$]{(.+?)}", str(data)):#判断请求数据中有要更新的数据
21             data=self.op_dict.update_request_data(data)
22         self.url=data['url']
23         self.case_name=data['CaseName']
24         self.method=data['method']
25         self.header=data["header"]
26         self.body=eval(data['body'])
27         self.extract_data=data['getResponseData']#提取字段
28         self.readData_code =data["code"]  # 获取excel表格数据的状态码
29         if self.header=="":
30             self.header=None
31         else:
32             self.header=eval(self.header)
33         requests.packages.urllib3.disable_warnings()#解决Python3控制台输出InsecureRequestWarning的问题
34         print("******* 正在执行用例 ->{0} *********".format(self.case_name))
35         print("请求方式: {0},请求URL: {1}".format(self.method, self.url))
36         print("请求数据类型为:{0} 请求header为:{1}".format(type(self.header), self.header))
37         print("请求数据类型为:{0} 请求body为:{1}".format(type(self.body), self.body))
38         self.re = RunMethod().run_main(self.method,self.url,self.body,self.header)# 发送请求
39         if self.re.status_code == 200:#网页正常访问,要执行的操作
40             print("页面返回信息:%s" % json.dumps(self.re.json(), ensure_ascii=False, indent=4))
41             self.assertIn(self.readData_code,self.re.text,"返回实际结果是->:%s" % self.re.json())
42             if self.extract_data:
43                 self.op_dict.get_response_date(self.re,self.extract_data)
44         else:
45              self.assertEqual(self.re.status_code, 200,"返回状态码status_code:{}".format(str(self.re.status_code)))
46 if __name__=='__main__':
47     unittest.main()

12.run执行层

 1 import unittest,sys
 2 from config import setting
 3 sys.path.append(setting.BASE_DIR)#将模块的路径通过sys.path.append(路径)添加到主程序中
 4 from BeautifulReport import BeautifulReport
 5 if __name__ == '__main__':
 6     """
 7     通过该类defaultTestLoader下面的discover()方法
 8     可自动更具测试目录start_dir匹配查找测试用例文件(test*.py),
 9     并将查找到的测试用例组装到测试套件
10     """
11     test_suite = unittest.defaultTestLoader.discover('testcase', pattern='test*.py')
12     result = BeautifulReport(test_suite)
13     result.report(filename='测试报告', description='接口自动化测试报告', log_path='report')

13.查看测试报告