Locust压测工具
一、实操示例:
1. 内部产品进行接口压力测试。
通过task的装饰器,tag传参控制压测哪个接口。
代码:
# coding=utf-8 import os import base64 import sys import time import queue import random import uuid from locust import HttpUser, SequentialTaskSet, task, between, tag class OcrTestTasks(SequentialTaskSet): def on_start(self): self.start_time = time.time() self.login_data = self.user.user_q.get() login_res = self.client.post("/api/file-engine/login", json=self.login_data, catch_response=True).json() time.sleep(0.1) self.user.user_q.put(self.login_data) # print(login_res) Authorization = "Bearer " + login_res["data"]["token"] self.header = { "Authorization": Authorization } page_info = self.client.get("/api/file-engine/showfiles", headers=self.header, params={"page_num": 1, "page_size": 100}).json() pages = int(page_info["data"]["pages"]) pages = 1 if pages == 0 else pages page_list = self.client.get("/api/file-engine/showfiles", headers=self.header, params={"page_num": pages, "page_size": 100}).json() user_files = [] for item in page_list["data"]["list"]: user_files.append(str(item["id"])) self.user_files = user_files # scene_id page_scene_info = self.client.get("/api/file-engine/showscenes", headers=self.header, params={"page_num": 1, "page_size": 100}).json() pages_scene = int(page_scene_info["data"]["pages"]) pages_scene = 1 if pages_scene == 0 else pages_scene page_scene_list = self.client.get("/api/file-engine/showscenes", headers=self.header, params={"page_num": pages_scene, "page_size": 100}).json() self.scene_ids = [i["id"] for i in page_scene_list["data"]["list"]] # 获取所有场景id # print(self.scene_ids) @task @tag("creat_task") def creat_task(self): files_path = [r"/mnt/locust/test_data/file_data/21.pdf", r"/mnt/locust/test_data/file_data/20170106_161827_银华永益分级债券型证券投资基金更新招募说明书(2017年第1号).doc", r"/mnt/locust/test_data/file_data/购销演示合同-扫描2.pdf"] scene_id = random.choice(self.scene_ids) print(scene_id) file_random = random.choice(files_path) files = {"file": (os.path.basename(file_random), open(file_random, 'rb').read())} creat_task_data = { "scene_id": str(scene_id) } with self.client.post("/api/file-engine/open/task", headers=self.header, files=files, data=creat_task_data, catch_response=True) as creat_task_res: if creat_task_res.json().get("code") == 200: creat_task_res.success() else: creat_task_res.failure("Failed!") print(creat_task_res.json()) @task @tag("file") def upload_file(self): files_path = [r"/mnt/locust/test_data/file_data/21.pdf", r"/mnt/locust/test_data/file_data/20170106_161827_银华永益分级债券型证券投资基金更新招募说明书(2017年第1号).doc", r"/mnt/locust/test_data/file_data/购销演示合同-扫描2.pdf"] # files_path = [r"D:\测试base\要素提取\21.pdf", r"D:\测试base\要素提取\20170106_161827_银华永益分级债券型证券投资基金更新招募说明书(2017年第1号).doc", r"D:\测试base\要素提取\购销演示合同-扫描2.pdf"] creat_data = [] for i in files_path: f = open(i, 'rb').read() files = {"file": (os.path.basename(i), f)} # 上传资源 with self.client.post("/api/file-engine/uploadfiles", headers=self.header, files=files, catch_response=True) as upload_file_res: if upload_file_res.json().get("code") == 200: creat_data.append(upload_file_res.json()["data"]) upload_file_res.success() else: upload_file_res.failure('Failed!') # 批量创建文件 with self.client.post("/api/file-engine/createfiles", headers=self.header, json=creat_data, catch_response=True) as response: if response.json().get("code") == 200: response.success() else: response.failure('Failed!') print(response.json()) @task @tag("scene") def scene_manage(self): data = { "name": str(uuid.uuid1()), "file_list": self.user_files, "mode_type": 1, } with self.client.post("/api/file-engine/createscene", headers=self.header, json=data, catch_response=True) as res_scene: if res_scene.json().get("code") == 200: res_scene.success() else: res_scene.failure("Failed!") print("user:", self.login_data) print(res_scene.json()) @task @tag("batch") def batch_test(self): scene_id = random.choice(self.scene_ids) data = { "name": str(uuid.uuid1()), "file_list": self.user_files, "scene_id": scene_id, } with self.client.post("/api/file-engine/task/create", headers=self.header, json=data, catch_response=True) as res_batch: if res_batch.json().get("code") == 200: res_batch.success() else: res_batch.failure("Failed!") print(res_batch.json()) print("user:", self.login_data) def on_stop(self): end_time = time.time() print("总时间:》", end_time-self.start_time) def creat_user_list(): num = 10 username_base = "test_api" #pwd = "1234" pwd = "IpdOSTwrdslLrHHFJMQcseIFwXR8V6Tbgedqp2zWgjPaIiHeOcsAn5HoT9NuvFbmq/aZ7pfByEGc5vdVAn5YxWumcfLHVYTxWq/kM7h/0Tyt80FKLsvIRU9ovuyluObwWgu9ZVchgkCatc5dq98glt2zP1AiKLzO/fDN/AzICkc=" user_data_li = [] for i in range(1, num+1): data = { "password": pwd, "username": username_base + str(i) } user_data_li.append(data) return user_data_li class UserRun(HttpUser): tasks = [OcrTestTasks] #host = "http://120.53.222.77:30081" host = "http://10.100.121.12:30080/" wait_time = between(5, 6) user_li = creat_user_list() user_q = queue.Queue(maxsize=len(user_li)) for i in user_li: user_q.put(i) if __name__ == '__main__': # web模式运行服务 tag = ["all", "file", "scene", "batch", "creat_task"] # os.system("locust -f locustfile_yaosu.py -T {}".format("all")) os.system(f"locust -f locustfile_yaosu.py -T {tag[4]}") # no_web模式 # host = "http://10.100.121.10:8506" u = 1 # 设置虚拟用户数。 r = 1000 # 设置每秒启动虚拟用户数 t = "5s" # 设置运行时间 # csv_base = "ocr_test" # 设置csv文件前缀 # os.system(f"locust -f locustfile.py -T {scene_type} --headless -u {u} -r {r} -t {t} --csv={csv_base}")
通过容器部署运行Locust服务:
docker pull locustio/locust:1.6.0
自动部署脚本:
#!/usr/bin/env bash run_container() { echo "run docker image" DOCKER_IMAGE="locustio/locust:latest" DEV_MOUNT="-v $PWD:/mnt/locust " container_name="yaosu_api_test" # 删除旧容器 docker rm -f ${container_name} > /dev/null 2>&1 # locust启动参数配置 # tag = ["all", "file", "scene", "batch", "creat_task"] 指定压测某个场景 locust_tag="creat_task" locustfile="locustfile_yaosu_api_task.py" docker run --name ${container_name} -u root -p 8089:8089 -d -it ${DEV_MOUNT} $DOCKER_IMAGE -f /mnt/locust/${locustfile} -T ${locust_tag} if [ $? -eq 0 ]; then echo "run docker image success" else echo "run docker image failed!" exit 1 fi } run_container
2. 模拟全流程进行稳定性压测
代码:
# coding=utf-8 import os import base64 import sys import time import queue import random import uuid from locust import HttpUser, SequentialTaskSet, task, between, tag import requests class OcrTestTasks(SequentialTaskSet): def on_start(self): self.start_time = time.time() self.login_data = self.user.user_q.get() login_res = self.client.post("/api/file-engine/login", json=self.login_data, catch_response=True).json() time.sleep(0.1) self.user.user_q.put(self.login_data) print(login_res) Authorization = "Bearer " + login_res["data"]["token"] self.header = { "Authorization": Authorization } def show_files(self): # 查看文件id, 并在后最后一页随机获取3个 page_info = self.client.get("/api/file-engine/showfiles", headers=self.header, params={"page_num": 1, "page_size": 100}, catch_response=True).json() pages = int(page_info["data"]["pages"]) page_list = self.client.get("/api/file-engine/showfiles", headers=self.header, params={"page_num": pages, "page_size": 100}, catch_response=True).json() user_files_ids = [] user_files_ids = [str(item['id']) for item in page_list["data"]["list"] if item["status_str"] == "完成"] #print(user_files_ids) #for item in page_list["data"]["list"]: # user_files_ids.append(str(item["id"])) # 从文件id列表随机获取三个文件id if len(user_files_ids) > 2: self.user_files = random.sample(user_files_ids, 3) else: if len(user_files_ids) == 0: time.sleep(30) self.show_files() else: self.user_files = user_files_ids def show_scenes(self): # 查看场景id page_scene_info = self.client.get("/api/file-engine/showscenes", headers=self.header, params={"page_num": 1, "page_size": 100}, catch_response=True).json() pages_scene = int(page_scene_info["data"]["pages"]) #print(pages_scene) page_scene_list = self.client.get("/api/file-engine/showscenes", headers=self.header, params={"page_num": pages_scene, "page_size": 100}, catch_response=True).json() self.scene_ids = [i["id"] for i in page_scene_list["data"]["list"] if i["status"]==2] # 获取所有场景id if not self.scene_ids: time.sleep(30) self.show_scenes() def upload_file(self): files_path = [r"/mnt/locust/test_data/file_data/21.pdf", r"/mnt/locust/test_data/file_data/20170106_161827_银华永益分级债券型证券投资基金更新招募说明书(2017年第1号).doc", r"/mnt/locust/test_data/file_data/购销演示合同-扫描2.pdf"] # files_path = [r"D:\测试base\文擎\test-data\要素提取\21.pdf", r"D:\测试base\文擎\test-data\要素提取\20170106_161827_银华永益分级债券型证券投资基金更新招募说明书(2017年第1号).doc", r"D:\测试base\文擎\test-data\要素提取\购销演示合同-扫描2.pdf"] creat_data = [] for i in files_path: f = open(i, 'rb').read() files = {"file": (os.path.basename(i), f)} # 上传资源 with self.client.post("/api/file-engine/uploadfiles", headers=self.header, files=files, catch_response=True) as upload_file_res: if upload_file_res.json().get("code") == 200: #print(upload_file_res.json()) creat_data.append(upload_file_res.json()["data"]) upload_file_res.success() else: upload_file_res.failure('Failed!') # 批量创建文件 #print("createfiles_def") creat_data_dic = { "ocr_mode_type": "table_print", "resources": creat_data } with self.client.post("/api/file-engine/createfiles", headers=self.header, json=creat_data_dic, catch_response=True) as response: if response.json().get("code") == 200: response.success() else: response.failure('Failed!') print(response.json()) def scene_manage(self): self.show_files() #print(self.user_files) data = { "name": str(uuid.uuid1()), "file_list": self.user_files, "mode_type_id": 1, } with self.client.post("/api/file-engine/createscene", headers=self.header, json=data, catch_response=True) as res_scene: if res_scene.json().get("code") == 200: #print(res_scene.json()) res_scene.success() else: res_scene.failure("Failed!") print("user:", self.login_data) print(res_scene.json()) # time.sleep(60) def batch_test(self): self.show_scenes() scene_id = random.choice(self.scene_ids) data = { "name": str(uuid.uuid1()), "file_list": self.user_files, "scene_id": scene_id, } with self.client.post("/api/file-engine/task/create", headers=self.header, json=data, catch_response=True) as res_batch: if res_batch.json().get("code") == 200: res_batch.success() else: res_batch.failure("Failed!") # print(res_batch.json()) @task def run_stability_test(self): self.upload_file() time.sleep(120) self.scene_manage() time.sleep(120) self.batch_test() def on_stop(self): end_time = time.time() print("总时间:》", end_time-self.start_time) def creat_user_list(): num = 10 username_base = "test_api" #pwd = "1234" #pwd = "ffYqYU5URcBjV0dV4PcsB+W7WpCZnQQ3kw/sNcbbx4cycNMlGtRDeAMPR/xWh/dVve5Jhp18RUGN7XO/O0rDmjtFGQsleSSBOoLeePC/bKlnTEfiwJgmsWRMimVYEr0/LcxHJcgsWVWxQ7Yw4KNH+/OI0VU4lD8f4kGN38rQKYk=" # http://172.27.133.37:31180/ pwd = "IpdOSTwrdslLrHHFJMQcseIFwXR8V6Tbgedqp2zWgjPaIiHeOcsAn5HoT9NuvFbmq/aZ7pfByEGc5vdVAn5YxWumcfLHVYTxWq/kM7h/0Tyt80FKLsvIRU9ovuyluObwWgu9ZVchgkCatc5dq98glt2zP1AiKLzO/fDN/AzICkc=" user_data_li = [] for i in range(1, num+1): data = { "password": pwd, "username": username_base + str(i) } user_data_li.append(data) return user_data_li class UserRun(HttpUser): tasks = [OcrTestTasks] #host = "http://120.53.222.77:30081" host = "http://10.100.121.160:30080" wait_time = between(599, 600) user_li = creat_user_list() user_q = queue.Queue(maxsize=len(user_li)) for i in user_li: user_q.put(i) if __name__ == '__main__': # web模式运行服务 tag = ["all", "file", "scene", "batch"] # os.system("locust -f locustfile_yaosu.py -T {}".format("all")) os.system("locust -f locustfile_yaosu_wending.py -T all") # no_web模式 # host = "http://10.100.121.10:8506" # u = 1 # 设置虚拟用户数。 # r = 1000 # 设置每秒启动虚拟用户数 # t = "5s" # 设置运行时间 # csv_base = "ocr_test" # 设置csv文件前缀 # os.system(f"locust -f locustfile.py -T {scene_type} --headless -u {u} -r {r} -t {t} --csv={csv_base}")
部署脚本:
#!/usr/bin/env bash run_container() { echo "run docker image" DOCKER_IMAGE="locustio/locust:latest" DEV_MOUNT="-v $PWD:/mnt/locust " container_name="locustfile_yaosu_wending" # 删除旧容器 docker rm -f ${container_name} > /dev/null 2>&1 # locust启动参数配置 # tag = ["all", "file", "scene", "batch"] 指定压测某个场景 locust_tag="batch" locustfile="locustfile_yaosu_wending.py" docker run --name ${container_name} -u root -p 8087:8089 -d -it ${DEV_MOUNT} $DOCKER_IMAGE -f /mnt/locust/${locustfile} if [ $? -eq 0 ]; then echo "run docker image success" else echo "run docker image failed!" exit 1 fi } run_container