locust 参数化实现


  一、locust 参数化很简单,引入队列的概念 queue ,实现方式是将参数推入队列,测试时依次取出,全部取完后 locust 会自动停止。若是使用参数循环压测,需要将取出的参数再推入队尾。

       二、断言用 assert 直接判断。(可能有些同学觉得 locust 脚本不好调试,并不能像普通 python 代码一样调试,这里提供个方法:调试脚本的时候可以在代码中加入 exit(0) 来结束进程)

 下面是一个简单的例子

from locust import HttpLocust, TaskSet, task, between, events
import time,os,queue
from locust.exception import LocustError
from geventhttpclient import HTTPClient
from geventhttpclient.url import URL

# 定义用户行为
class UserTsak(TaskSet):
    def on_start(self):
        '''初始化数据'''
        url = URL('https://10.1.62.133')
        self.http = HTTPClient(url.host)

    @task
    def getHomeInfo(self):
        try:
            number = self.locust.q.get()
            # 推入队尾
            self.locust.q.put(number)  
            start_time = time.time()
            res = self.http.get("/sz/api2/test/{}".format(number))
            data = res.read()
            end_time = time.time()
            response_time =int((end_time - start_time)*1000)
            response_length = len(data)
            # 设置断言
         assert response_length == 30 

            if res.status_code == 200:
                events.request_success.fire(request_type="GET", name="test_success", response_time = response_time, response_length=response_length)

        except AssertionError:
            events.request_failure.fire(request_type="GET", name="test_failure", response_time=response_time,response_length=0,
                                        exception="断言错误。status_code:{}。接口返回:{}。".format(res.status_code,json.loads(data)))
        except Exception as e:
            events.locust_error.fire(locust_instance=UserTsak, exception=e, tb =sys.exc_info()[2])

    def on_stop(self):
        '''销毁数据'''
        self.conn.close()

class WebsiteUser(HttpLocust):
    host = 'https://10.1.62.133'
    task_set = UserTsak
    wait_time = between(0, 0)
    # 创建队列
    self.q = queue.Queue()
    # 将参数推入队列
    for i in range(10000):
        self.q.put(i)