测试开发,Python + Flask 实现接口接收 Disk 信息
今天分享的内容是基于:测试开发:Python + Flask 实现接口接收内存信息 来进一步分享如何使用 Python + Flask 实现接收 Disk 的信息。
原理:
通过 Python 调用 Shell 脚本去执行 Disk 的相关命令,然后进行处理再请求 Requests 库来向后端定义好的接口推送数据。
Part1:收集端
import os import requests import json import time url="http://10.8.31.61:5555/GetDiskResource" mem_data={} mem_cmd = [ "df -h |grep home |awk -F' ' '{print $2}'", "df -h |grep home |awk -F' ' '{print $3}'", "df -h |grep home |awk -F' ' '{print $4}'" ] def exec_cmd(): for cmd in mem_cmd: print(cmd) response = os.popen(cmd) if("$2" in cmd): mem_data['total']=str(response.read()).replace("\n","") elif("$3" in cmd): mem_data['used']=str(response.read()).replace("\n","") elif("$4" in cmd): mem_data['available']=str(response.read()).replace("\n","") else: mem_data['hostname']=str(os.popen("hostname |awk -F'.' '{print $1}' |awk -F'-' '{print $2}'").read()).replace("\n","") response.close() def httpPost(datas): header = {"Content-Type":"application/json"} resp_content = requests.post(url=url,data=json.dumps(datas),headers=header) print(resp_content.text) if __name__ == '__main__': while True: exec_cmd() httpPost(mem_data) time.sleep(3600)
Part2:接收端
#磁盘路由处理------------------------------------------------------- @resource.route('/GetDiskResource',methods=['POST']) def GetDiskResource(): '''接收来自linux上传的数据''' query = request.get_json() hostname = query["hostname"] total = query["total"] used = query["used"] available = query["available"] createtime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) sql = "insert into disk_info (hostname,total,used,available,create_time) VALUES " data = "('" + hostname + "','" + total + "','" + used + "','" + available + "','" + str(createtime) + "'" end = data + ")" sql = sql + end print(sql) db = conndb() db.execute_sql(sql) data = {'code': 200, 'message': 'success', 'status': '10000'} return json.dumps(data)
Part3:展示端
这部分主要分为以下两块内容:
第一块是页面请求
<template> <div> <div class="crumbs"> <el-breadcrumb separator="/"> <el-breadcrumb-item> <i class="el-icon-lx-cascades">i> 磁盘信息 el-breadcrumb-item> el-breadcrumb> div> <div class="container"> <div class="handle-box"> <el-input v-model="query.hostname" placeholder="环境" class="handle-input mr10" clearable @clear="clear_name">el-input> <el-button type="primary" icon="el-icon-search" @click="handleSearch">搜索el-button> div> <el-table :data="tableData" border class="table" ref="multipleTable" header-cell-class-name="table-header"> <el-table-column prop="id" label="ID" width="55" align="center">el-table-column> <el-table-column prop="hostname" label="环境">el-table-column> <el-table-column prop="total" label="总共">el-table-column> <el-table-column prop="used" label="使用掉">el-table-column> <el-table-column prop="available" label="可用"> <template #default="scope"> <el-tag :type="availableplus(scope.row.available) === 'success' ? 'success': 'danger'">{{ scope.row.available }}el-tag> template> el-table-column> <el-table-column prop="create_time" width="160" label="创建时间">el-table-column> el-table> <el-pagination @size-change="handleSizeChange" @current-change="handleCurrentChange" :current-page="query.pageIndex" :page-sizes="[5, 10, 20, 30]" :page-size="query.pageSize" layout="total, sizes, prev, pager, next, jumper" :total="parseInt(pageTotal)"> el-pagination> div> div> template> <script> import server from '../api/request.js' export default { name: 'InterfaceMem', data () { return { query: { hostname: '', pageIndex: 1, pageSize: 10 }, tableData: [], pageTotal: 0 } }, created () { this.getDiskData() }, methods: { // 获取后端返回的真实数据 getDiskData () { server({url: '/getDiskList', data: this.query, method: 'post'}) .then(response => { console.log('**********') console.log(response) this.tableData = response.listdata console.log(this.tableData) this.pageTotal = response.pageTotal || 10 }) }, // 触发搜索按钮 handleSearch () { server({url: '/getDiskList', data: this.query, method: 'post'}) .then(response => { console.log(response) this.tableData = response.listdata console.log(this.tableData) this.pageTotal = response.pageTotal || 10 }) }, // 分页导航 handleSizeChange (val) { // console.log(val) this.$set(this.query, 'pageSize', val) // console.log(this.query) this.getDiskData() }, // 翻页改变页码触发 handleCurrentChange (val) { this.$set(this.query, 'pageIndex', val) this.getDiskData() }, clear_name () { this.query.hostname = '' this.getDiskData() }, availableplus(rows){ const availabl =rows.replace("G","") return Number(availabl) <15 ? 'danger' : 'success' } } } script> <style scoped> .handle-box { margin-bottom: 20px; } .handle-select { width: 120px; } .handle-input { width: 300px; display: inline-block; } .table { width: 100%; font-size: 14px; } .red { color: #ff0000; } .mr10 { margin-right: 10px; } .table-td-thumb { display: block; margin: auto; width: 40px; height: 40px; } style>
第二块是后端请求处理
@resource.route('/getDiskList',methods=['POST']) def getDiskList(): '''fe的页面列表数据获取''' query = request.get_json() print(query) if (query["hostname"] == ""): sql1 = "select id,hostname,total,used,available,create_time from disk_info order by id DESC limit " + str( (query['pageIndex'] - 1) * query["pageSize"]) + "," + str(query["pageSize"]) count_sql = "select count(*) from disk_info" colume_sql = "select id from disk_info" else: sql1 = "select id,hostname,total,used,available,create_time from mem_info where hostname like '%" + str(query["hostname"]) + "%' order by id DESC" + " limit " + str( (query['pageIndex'] - 1) * query["pageSize"]) + "," + str(query["pageSize"]) count_sql = "select count(*) from disk_info where hostname like '%" + str( query["hostname"]) + "%' order by id DESC" colume_sql = "select id from disk_info" sql2 = "select id,hostname,total,used,available,create_time from disk_info" db = conndb() listdata = db.get_data(sql1, sql2) db = conndb() result = db.get_data(count_sql, colume_sql) print(result) pageTotal = result[0]['id'] print(listdata) print(pageTotal) data = {'listdata': listdata, "pageTotal": pageTotal, "code": 200} return json.dumps(data)
Part4:页面展示
最后
送给大家一句话,共勉:当我们能力不足的时候,首先要做的是内修!当我们能力足够强大的时候,就可以外寻了!
现在就有这么一个机会,我邀请你点击进入我们的软件测试学习摸鱼群,大家可以一起探讨交流软件测试,共同学习软件测试技术、面试等软件测试方方面面,还会有学习文档资源,收获更多测试技巧,我们一起进阶Python自动化测试/测试开发,走向高薪之路。