Pytest25--重温面向过程编程之函数模型


面向过程编程之函数模型

import configparser, pymysql, requests, pandas, os
#读入口配置文件函数
def read_entry(): #无参,返回本次测试用的接口服务器和数据库的入口名
    conf=configparser.ConfigParser()
    conf.read('entry.ini')
    which_server=conf.get('entry', 'which_server')
    which_db=conf.get('entry', 'which_db')
    return which_server, which_db
#读接口服务器配置文件函数
def read_server_conf(): #无参,返回http://IP:端口号
    conf=configparser.ConfigParser()
    conf.read('server.conf', encoding='utf-8')
    which_server=read_entry()[0]
    ip=conf.get(which_server, 'ip')
    port=conf.get(which_server, 'port')
    host='http://%s:%s'%(ip,port)
    return host
#读数据库配置文件函数
def read_db_conf(): #无参,返回数据库信息字典
    conf=configparser.ConfigParser()
    conf.read('db.conf')
    which_db=read_entry()[1]
    host=conf.get(which_db, 'host')
    db=conf.get(which_db, 'db')
    user=conf.get(which_db, 'user')
    passwd=conf.get(which_db, 'passwd')
    dbinfo={'host':host,'db':db,'user':user,'passwd':passwd}
    return dbinfo
#修改入口名函数
def update_entry(): #无参,无返回值
    is_update=input('是否修改入口名(y/Y表示是,其他表示否):')
    if is_update in{'y','Y'}:
        new_server=input('新接口服务器入口名:')
        new_db=input('新数据库服务器入口名:')
        if {new_server,new_db}.issubset({'debug','formal','smoke','regress'}):
            old_server,old_db=read_entry()
            if new_server!=old_server and new_db!=old_db:
                conf=configparser.ConfigParser()
                conf.read('entry.ini')
                conf.set('entry', 'which_server', new_server)
                conf.set('entry', 'which_db', new_db)
                file=open('entry.ini', 'w') #w不能省略
                conf.write(file)
                file.close()
                print('成功将入口名(%s,%s)修改为(%s,%s)'%(old_server,old_db,new_server,new_db))
            else:
                print('入口名(%s,%s)未发生改变'%(old_server,old_db))
        else:
            print('入口名错误,只能输入debug、smoke、formal、regress之一')
    else:
        print('取消修改入口名')
#连接数据库函数
def conn_db(): #无参,返回数据库连接对象、游标
    dbinfo=read_db_conf()
    conn=pymysql.connect(**dbinfo)
    cursor=conn.cursor()
    return conn,cursor
#读sql语句文件的函数
def read_sqls(*sqlfiles): #有参(0、1、多个),返回sql语句列表,sqlsfiles=(('login.sql')),sqlsfiles=('login.sql')
    if not sqlfiles: #表示sqlfiles为空
        sqlfiles=[i for i in os.listdir() if i.endswith('.sql')]
    # print(sqlfiles) #调试
    sqls=[] #存sql语句的列表
    for file in sqlfiles: #file为文件名
        data=open(file,'r') #data表示文件中所有行
        for sql in data: #sql是一行
            if sql.strip() and not sql.startswith('--'):
                sqls.append(sql.strip())
    return sqls
#初始化数据库函数
def init_db(*sqlfiles): #有参(用于传给read_sqls),无返回值,sqlfiles=('login.sql')
    conn,cursor=conn_db()
    # sqls=read_sqls(sqlfiles) #不能省略*,read_sqls(('login.sql'))
    sqls=read_sqls(*sqlfiles) #read_sqls('login.sql')
    # print(sqls) #调试
    for sql in sqls:
        cursor.execute(sql)
    conn.commit()
    conn.close()
#验库函数
def check_db(case_info,args,check_sql,db_expect_rows): #用例信息,检查的数据/参数、执行检查的sql语句、数据库预期行数
    conn,cursor=conn_db()
    cursor.execute(check_sql)
    db_actual_rows=cursor.fetchone()[0]
    if db_actual_rows==db_expect_rows:
        print(case_info+'==落库检查通过')
    else:
        print(case_info+'==落库检查失败==检查的数据:%s==预期行数:%s==实际行数:%s'%(args,db_expect_rows,db_actual_rows))
#读含多个参数列的用例的函数(原来的cols_to_dict函数)
def read_cases(xlsfile, prefixs, dict_indexs, columns=None, col_type=None): #把多个列组装成字典
    data=pandas.read_excel(xlsfile, usecols=columns, dtype=col_type, keep_default_na=False)
    if type(prefixs) in(list,tuple) and type(dict_indexs) in(list,tuple):
        prefixs_and_indexs=zip(prefixs,dict_indexs)
    elif type(prefixs)==str and type(dict_indexs)==int:
        prefixs_and_indexs=((prefixs,dict_indexs),) #二维元组
    else:
        print('prefixs的类型只能是列表或元组或字符串,dict_indexs的类型只能是列表或元组或整数')
        return [] #返回空列表
    for prefix, dict_index in prefixs_and_indexs:
        cols=data.filter(regex='^'+prefix, axis=1) #过滤出前缀开头的列
        col_names=cols.columns.values #以前缀prefix开头的列名
        col_names_new=[i[len(prefix):] for i in col_names]#真正的参数名
        col_values=cols.values.tolist() #前缀开头的多行数据列表
        cols=[] #新的存字典的列表
        for value in col_values:
            col_dict=dict(zip(col_names_new, value))
            cols.append(col_dict)
        data.drop(col_names, axis=1, inplace=True)#drop删列存回data
        data.insert(dict_index, prefix, cols) #把cols列表的每个元素作为一行插入到data的dict_index列,列名为prefix
    cases=data.values.tolist()
    return cases
#读带{:}参数的用例的函数
def read_dict_cases(xlsfile,columns=None): #参数:1个excel文件名,columns用于存储列名,处理好{:}的数据为字典,返回用例列表
    data=pandas.read_excel(xlsfile, usecols=columns)
    cases=data.values.tolist()
    for case in cases:
        for i in range(len(case)):
            if str(case[i]).startswith('{') and str(case[i]).endswith('}') and ':' in str(case[i]):
                case[i]=eval(case[i])
    return cases
#发送请求
def send_request(method, url, args): #方法、地址、参数;返回返回值类型、实际结果
    send="requests.%s('%s',%s)"%(method, url, args)
    # print(send) #调试
    res=eval(send)
    # print(res.headers['Content-Type']) #响应/返回值类型
    if 'text' in res.headers['Content-Type']:
        res_type='text' #返回值类型
        actual=res.text #实际结果,类型:text/html; charset=gbk
    elif 'json' in res.headers['Content-Type']:
        res_type='json'
        actual=res.json() #实际结果,类型:application/json;charset=utf8
    else:
        pass
    return res_type, actual
#比对响应结果函数
def check(case_info, res_type, actual, expect):
    passed=False #预置变量,表示测试不通过
    if res_type=='text' and expect in actual:
        passed=True
    elif res_type=='json' and expect==actual:
        passed=True
    else: pass
    if passed:
        print(case_info + '==比对响应结果通过')
    else:
        print(case_info + '==比对响应结果失败==预期:%s==实际:%s' % (expect, actual))
#运行测试的函数
def run_test(sqlfile, xlsfile, prefixs, dict_indexs, is_checkdb, columns=None):
    init_db(sqlfile)
    cases=read_cases(xlsfile, prefixs, dict_indexs, columns)
    for case in cases:
        if not is_checkdb:
            case_id,case_name,api_path,method,args,expect=case
        else:
            case_id,case_name,api_path,method,args, expect,check_sql,db_expect_rows = case
        case_info=case_id+':'+case_name
        url = read_server_conf() + api_path
        res_type, actual = send_request(method, url, args)
        check(case_info, res_type, actual, expect)
        if is_checkdb:
            check_db(case_info, args, check_sql, db_expect_rows)
#测试登录接口的函数
def test_login():
    run_test('login.sql', 'login_xin.xlsx', 'arg_', 4, False)
#测试注册接口的函数
def test_signup():
    run_test('signup.sql','signup_xin.xlsx', ['arg_','expect_'], [4, 5], True)
if __name__=='__main__':
    # print(read_entry())
    # print(read_server_conf())
    # print(read_db_conf())
    # update_entry()
    # conn,cursor=conn_db()
    # print(os.listdir())
    # sqlfiles=[i for i in os.listdir() if i.endswith('.sql')]
    # print(sqlfiles)
    # print(read_sqls())
    # print(read_sqls('login.sql'))
    # print(read_sqls('login.sql','signup.sql'))
    # init_db()
    # init_db('login.sql')
    # init_db('login.sql', 'signup.sql')
    # check_db('用例信息',{'a':2,'b':3}, 'select count(*) from user',4)
    # print(read_cases('login_xin.xlsx','arg_',4,col_type={'arg_password':str}))
    # print(read_cases('signup_xin.xlsx',['arg_','expect_'],[4,5],col_type={'arg_password':str,'arg_confirm':str}))
    # print(send_request('post','http://192.168.150.213/exam/login/',{'username':'admin','password':'123456'}))
    # print(send_request('post', 'http://192.168.150.213/exam/signup/', {'username': 'admin', 'password': '123456','confirm':'123456','name':'管理员'}))
    # check('用例信息', 'text', '登录成功了', '登录成功')
    # check('用例信息', 'text', '登路成功了', '登录成功')
    # check('用例信息', 'json', {'a':1,'b':2}, {'b':2,'a':1})
    # check('用例信息', 'json', {'a':1},{'a':1,'b':2})
    # test_login()
    test_signup()

相关