Locust压测工具


一、实操示例:

1. 内部产品进行接口压力测试。

通过task的装饰器,tag传参控制压测哪个接口。

代码:

# coding=utf-8
import os
import base64
import sys
import time
import queue
import random
import uuid
from locust import HttpUser, SequentialTaskSet, task, between, tag


class OcrTestTasks(SequentialTaskSet):
    def on_start(self):
        self.start_time = time.time()
        self.login_data = self.user.user_q.get()
        login_res = self.client.post("/api/file-engine/login", json=self.login_data, catch_response=True).json()
        time.sleep(0.1)
        self.user.user_q.put(self.login_data)
        # print(login_res)
        Authorization = "Bearer " + login_res["data"]["token"]
        self.header = {
            "Authorization": Authorization
        }
        page_info = self.client.get("/api/file-engine/showfiles", headers=self.header, params={"page_num": 1, "page_size": 100}).json()
        pages = int(page_info["data"]["pages"])
        pages = 1 if pages == 0 else pages
        page_list = self.client.get("/api/file-engine/showfiles", headers=self.header, params={"page_num": pages, "page_size": 100}).json()
        user_files = []
        for item in page_list["data"]["list"]:
            user_files.append(str(item["id"]))
        self.user_files = user_files

        # scene_id
        page_scene_info = self.client.get("/api/file-engine/showscenes", headers=self.header, params={"page_num": 1, "page_size": 100}).json()
        pages_scene = int(page_scene_info["data"]["pages"])
        pages_scene = 1 if pages_scene == 0 else pages_scene
        page_scene_list = self.client.get("/api/file-engine/showscenes", headers=self.header, params={"page_num": pages_scene, "page_size": 100}).json()
        self.scene_ids = [i["id"] for i in page_scene_list["data"]["list"]]  # 获取所有场景id
        # print(self.scene_ids)

    @task
    @tag("creat_task")
    def creat_task(self):
        files_path = [r"/mnt/locust/test_data/file_data/21.pdf", r"/mnt/locust/test_data/file_data/20170106_161827_银华永益分级债券型证券投资基金更新招募说明书(2017年第1号).doc", r"/mnt/locust/test_data/file_data/购销演示合同-扫描2.pdf"]
        scene_id = random.choice(self.scene_ids)
        print(scene_id)
        file_random = random.choice(files_path)
        files = {"file": (os.path.basename(file_random), open(file_random, 'rb').read())}
        creat_task_data = {
            "scene_id": str(scene_id)
        }
        with self.client.post("/api/file-engine/open/task", headers=self.header, files=files, data=creat_task_data,  catch_response=True) as creat_task_res:
            if creat_task_res.json().get("code") == 200:
                creat_task_res.success()
            else:
                creat_task_res.failure("Failed!")
            print(creat_task_res.json())

    @task
    @tag("file")
    def upload_file(self):
        files_path = [r"/mnt/locust/test_data/file_data/21.pdf", r"/mnt/locust/test_data/file_data/20170106_161827_银华永益分级债券型证券投资基金更新招募说明书(2017年第1号).doc", r"/mnt/locust/test_data/file_data/购销演示合同-扫描2.pdf"]
        # files_path = [r"D:\测试base\要素提取\21.pdf", r"D:\测试base\要素提取\20170106_161827_银华永益分级债券型证券投资基金更新招募说明书(2017年第1号).doc", r"D:\测试base\要素提取\购销演示合同-扫描2.pdf"]
        creat_data = []
        for i in files_path:
            f = open(i, 'rb').read()
            files = {"file": (os.path.basename(i), f)}
            # 上传资源
            with self.client.post("/api/file-engine/uploadfiles", headers=self.header, files=files, catch_response=True) as upload_file_res:
                if upload_file_res.json().get("code") == 200:
                    creat_data.append(upload_file_res.json()["data"])
                    upload_file_res.success()
                else:
                    upload_file_res.failure('Failed!')
        # 批量创建文件
        with self.client.post("/api/file-engine/createfiles", headers=self.header, json=creat_data, catch_response=True) as response:
            if response.json().get("code") == 200:
                response.success()
            else:
                response.failure('Failed!')
                print(response.json())

    @task
    @tag("scene")
    def scene_manage(self):
        data = {
            "name": str(uuid.uuid1()),
            "file_list": self.user_files,
            "mode_type": 1,
        }
        with self.client.post("/api/file-engine/createscene", headers=self.header, json=data, catch_response=True) as res_scene:
            if res_scene.json().get("code") == 200:
                res_scene.success()
            else:
                res_scene.failure("Failed!")
                print("user:", self.login_data)
                print(res_scene.json())

    @task
    @tag("batch")
    def batch_test(self):
        scene_id = random.choice(self.scene_ids)
        data = {
            "name": str(uuid.uuid1()),
            "file_list": self.user_files,
            "scene_id": scene_id,
        }
        with self.client.post("/api/file-engine/task/create", headers=self.header, json=data, catch_response=True) as res_batch:
            if res_batch.json().get("code") == 200:
                res_batch.success()
            else:
                res_batch.failure("Failed!")
                print(res_batch.json())
                print("user:", self.login_data)

    def on_stop(self):
        end_time = time.time()
        print("总时间:》", end_time-self.start_time)


def creat_user_list():
    num = 10
    username_base = "test_api"
    #pwd = "1234"
    pwd = "IpdOSTwrdslLrHHFJMQcseIFwXR8V6Tbgedqp2zWgjPaIiHeOcsAn5HoT9NuvFbmq/aZ7pfByEGc5vdVAn5YxWumcfLHVYTxWq/kM7h/0Tyt80FKLsvIRU9ovuyluObwWgu9ZVchgkCatc5dq98glt2zP1AiKLzO/fDN/AzICkc="
    user_data_li = []
    for i in range(1, num+1):
        data = {
                "password": pwd,
                "username": username_base + str(i)
            }
        user_data_li.append(data)
    return user_data_li


class UserRun(HttpUser):
    tasks = [OcrTestTasks]
    #host = "http://120.53.222.77:30081"
    host = "http://10.100.121.12:30080/"
    wait_time = between(5, 6)
    user_li = creat_user_list()
    user_q = queue.Queue(maxsize=len(user_li))
    for i in user_li:
        user_q.put(i)


if __name__ == '__main__':
    # web模式运行服务
    tag = ["all", "file", "scene", "batch", "creat_task"]
    # os.system("locust -f locustfile_yaosu.py -T {}".format("all"))
    os.system(f"locust -f locustfile_yaosu.py -T {tag[4]}")
    # no_web模式
    # host = "http://10.100.121.10:8506"
    u = 1  # 设置虚拟用户数。
    r = 1000  # 设置每秒启动虚拟用户数
    t = "5s"  # 设置运行时间
    # csv_base = "ocr_test"  # 设置csv文件前缀
    # os.system(f"locust -f locustfile.py -T {scene_type} --headless -u {u} -r {r} -t {t} --csv={csv_base}")

通过容器部署运行Locust服务:

docker pull locustio/locust:1.6.0

自动部署脚本:

#!/usr/bin/env bash


run_container() {
    echo "run docker image"
    DOCKER_IMAGE="locustio/locust:latest"
    DEV_MOUNT="-v $PWD:/mnt/locust "
    container_name="yaosu_api_test"
    # 删除旧容器
    docker rm -f ${container_name} > /dev/null 2>&1
    # locust启动参数配置
    # tag = ["all", "file", "scene", "batch", "creat_task"] 指定压测某个场景
    locust_tag="creat_task"
    locustfile="locustfile_yaosu_api_task.py"

    docker run  --name ${container_name} -u root -p 8089:8089 -d -it ${DEV_MOUNT} $DOCKER_IMAGE -f /mnt/locust/${locustfile} -T ${locust_tag}
    if [ $? -eq 0 ]; then
        echo "run  docker image success"
    else
        echo "run docker image failed!"
        exit 1
    fi
}

run_container

2. 模拟全流程进行稳定性压测

代码:

# coding=utf-8
import os
import base64
import sys
import time
import queue
import random
import uuid
from locust import HttpUser, SequentialTaskSet, task, between, tag
import requests


class OcrTestTasks(SequentialTaskSet):
    def on_start(self):
        self.start_time = time.time()
        self.login_data = self.user.user_q.get()
        login_res = self.client.post("/api/file-engine/login", json=self.login_data, catch_response=True).json()
        time.sleep(0.1)
        self.user.user_q.put(self.login_data)
        print(login_res)
        Authorization = "Bearer " + login_res["data"]["token"]
        self.header = {
            "Authorization": Authorization
        }

    def show_files(self):
        # 查看文件id, 并在后最后一页随机获取3个
        page_info = self.client.get("/api/file-engine/showfiles", headers=self.header,
                                    params={"page_num": 1, "page_size": 100}, catch_response=True).json()
        pages = int(page_info["data"]["pages"])
        page_list = self.client.get("/api/file-engine/showfiles", headers=self.header, params={"page_num": pages, "page_size": 100}, catch_response=True).json()
        user_files_ids = []
        user_files_ids = [str(item['id']) for item in page_list["data"]["list"] if item["status_str"] == "完成"]
        #print(user_files_ids)
        #for item in page_list["data"]["list"]:
        #    user_files_ids.append(str(item["id"]))
        # 从文件id列表随机获取三个文件id
        if len(user_files_ids) > 2:
            self.user_files = random.sample(user_files_ids, 3)
        else:
            if len(user_files_ids) == 0:
                time.sleep(30)
                self.show_files()
            else:
                self.user_files = user_files_ids

    def show_scenes(self):
        # 查看场景id
        page_scene_info = self.client.get("/api/file-engine/showscenes", headers=self.header, params={"page_num": 1, "page_size": 100}, catch_response=True).json()
        pages_scene = int(page_scene_info["data"]["pages"])
        #print(pages_scene)
        page_scene_list = self.client.get("/api/file-engine/showscenes", headers=self.header, params={"page_num": pages_scene, "page_size": 100}, catch_response=True).json()
        self.scene_ids = [i["id"] for i in page_scene_list["data"]["list"] if i["status"]==2]  # 获取所有场景id
        if not self.scene_ids:
            time.sleep(30)
            self.show_scenes()

    def upload_file(self):
        files_path = [r"/mnt/locust/test_data/file_data/21.pdf", r"/mnt/locust/test_data/file_data/20170106_161827_银华永益分级债券型证券投资基金更新招募说明书(2017年第1号).doc", r"/mnt/locust/test_data/file_data/购销演示合同-扫描2.pdf"]
        # files_path = [r"D:\测试base\文擎\test-data\要素提取\21.pdf", r"D:\测试base\文擎\test-data\要素提取\20170106_161827_银华永益分级债券型证券投资基金更新招募说明书(2017年第1号).doc", r"D:\测试base\文擎\test-data\要素提取\购销演示合同-扫描2.pdf"]
        creat_data = []
        for i in files_path:
            f = open(i, 'rb').read()
            files = {"file": (os.path.basename(i), f)}
            # 上传资源
            with self.client.post("/api/file-engine/uploadfiles", headers=self.header, files=files, catch_response=True) as upload_file_res:
                if upload_file_res.json().get("code") == 200:
                    #print(upload_file_res.json())
                    creat_data.append(upload_file_res.json()["data"])
                    upload_file_res.success()
                else:
                    upload_file_res.failure('Failed!')
        # 批量创建文件
        #print("createfiles_def")
        creat_data_dic = {
            "ocr_mode_type": "table_print",
            "resources": creat_data
        }
        with self.client.post("/api/file-engine/createfiles", headers=self.header, json=creat_data_dic, catch_response=True) as response:
            if response.json().get("code") == 200:
                response.success()
            else:
                response.failure('Failed!')
                print(response.json())

    def scene_manage(self):
        self.show_files()
        #print(self.user_files)
        data = {
            "name": str(uuid.uuid1()),
            "file_list": self.user_files,
            "mode_type_id": 1,
        }
        with self.client.post("/api/file-engine/createscene", headers=self.header, json=data, catch_response=True) as res_scene:
            if res_scene.json().get("code") == 200:
                #print(res_scene.json())
                res_scene.success()
            else:
                res_scene.failure("Failed!")
                print("user:", self.login_data)
                print(res_scene.json())
        # time.sleep(60)

    def batch_test(self):
        self.show_scenes()
        scene_id = random.choice(self.scene_ids)
        data = {
            "name": str(uuid.uuid1()),
            "file_list": self.user_files,
            "scene_id": scene_id,
        }
        with self.client.post("/api/file-engine/task/create", headers=self.header, json=data, catch_response=True) as res_batch:
            if res_batch.json().get("code") == 200:
                res_batch.success()
            else:
                res_batch.failure("Failed!")
            # print(res_batch.json())

    @task
    def run_stability_test(self):
        self.upload_file()
        time.sleep(120)
        self.scene_manage()
        time.sleep(120)
        self.batch_test()

    def on_stop(self):
        end_time = time.time()
        print("总时间:》", end_time-self.start_time)


def creat_user_list():
    num = 10
    username_base = "test_api"
    #pwd = "1234"
    #pwd = "ffYqYU5URcBjV0dV4PcsB+W7WpCZnQQ3kw/sNcbbx4cycNMlGtRDeAMPR/xWh/dVve5Jhp18RUGN7XO/O0rDmjtFGQsleSSBOoLeePC/bKlnTEfiwJgmsWRMimVYEr0/LcxHJcgsWVWxQ7Yw4KNH+/OI0VU4lD8f4kGN38rQKYk="
    # http://172.27.133.37:31180/
    pwd = "IpdOSTwrdslLrHHFJMQcseIFwXR8V6Tbgedqp2zWgjPaIiHeOcsAn5HoT9NuvFbmq/aZ7pfByEGc5vdVAn5YxWumcfLHVYTxWq/kM7h/0Tyt80FKLsvIRU9ovuyluObwWgu9ZVchgkCatc5dq98glt2zP1AiKLzO/fDN/AzICkc="
    user_data_li = []
    for i in range(1, num+1):
        data = {
                "password": pwd,
                "username": username_base + str(i)
            }
        user_data_li.append(data)
    return user_data_li


class UserRun(HttpUser):
    tasks = [OcrTestTasks]
    #host = "http://120.53.222.77:30081"
    host = "http://10.100.121.160:30080"
    wait_time = between(599, 600)
    user_li = creat_user_list()
    user_q = queue.Queue(maxsize=len(user_li))
    for i in user_li:
        user_q.put(i)


if __name__ == '__main__':
    # web模式运行服务
    tag = ["all", "file", "scene", "batch"]
    # os.system("locust -f locustfile_yaosu.py -T {}".format("all"))
    os.system("locust -f locustfile_yaosu_wending.py -T all")
    # no_web模式
    # host = "http://10.100.121.10:8506"
    # u = 1  # 设置虚拟用户数。
    # r = 1000  # 设置每秒启动虚拟用户数
    # t = "5s"  # 设置运行时间
    # csv_base = "ocr_test"  # 设置csv文件前缀
    # os.system(f"locust -f locustfile.py -T {scene_type} --headless -u {u} -r {r} -t {t} --csv={csv_base}")

部署脚本:

#!/usr/bin/env bash


run_container() {
    echo "run docker image"
    DOCKER_IMAGE="locustio/locust:latest"
    DEV_MOUNT="-v $PWD:/mnt/locust "
    container_name="locustfile_yaosu_wending"
    # 删除旧容器
    docker rm -f ${container_name} > /dev/null 2>&1
    # locust启动参数配置
    # tag = ["all", "file", "scene", "batch"] 指定压测某个场景
    locust_tag="batch"
    locustfile="locustfile_yaosu_wending.py"

    docker run  --name ${container_name} -u root -p 8087:8089 -d -it ${DEV_MOUNT} $DOCKER_IMAGE -f /mnt/locust/${locustfile}
    if [ $? -eq 0 ]; then
        echo "run  docker image success"
    else
        echo "run docker image failed!"
        exit 1
    fi
}

run_container