from locust import TaskSet, task, HttpLocustimport queue class UserBehavior(TaskSet): @task def test_register(self): try: # get_nowait() 取不到数据直接崩溃;get() 取不到数据会一直等待 data = self.locust.user_data_queue.get_nowait() # 取值顺序 ‘username‘: ‘test0000‘、‘username‘: ‘test0001‘、‘username‘: ‘test0002‘... except queue.Empty: # 取不到数据时,走这里 print(‘account data run out, test ended.‘) exit(0) print(‘register with user: {}, pwd: {}‘.format(data[‘username‘], data[‘password‘])) payload = { ‘username‘: data[‘username‘], ‘password‘: data[‘password‘] } r = self.client.post(‘/user/signin?ReturnUrl=https%3A%2F%2Fwww.cnblogs.com%2F‘, data=payload) self.locust.user_data_queue.put_nowait(data) # 把取出来的数据重新加入队列 assert r.status_code == 200 class WebsiteUser(HttpLocust): host = ‘https://passport.cnblogs.com‘ task_set = UserBehavior user_data_queue = queue.Queue() # 创建队列,先进先出 for index in range(100): data = { "username": "test%04d" % index, "password": "pwd%04d" % index, "email": "test%[email protected]" % index, "phone": "186%08d" % index, } user_data_queue.put_nowait(data) # 循环加入队列<全部>,循环完,继续执行 min_wait = 1000 max_wait = 3000
参考:http://debugtalk.com/post/head-first-locust-advanced-script/
原文地址:https://www.cnblogs.com/changqing8023/p/9563364.html
时间: 2024-10-07 13:39:28