测开之路七十九:性能测试蓝图之执行逻辑

import osimport csvimport json

from kafka import KafkaProducerfrom kafka import KafkaConsumer

from common.mongo import Mongofrom common import get_case_id

class Logic(object):

    def __init__(self): pass

    def start(self, data):        """ 这是投递消息函数,只负责投递消息。"""        case_id = get_case_id()        data.setdefault(‘_id‘, case_id)        producer = KafkaProducer(bootstrap_servers=[‘localhost:9092‘],                                 value_serializer=lambda v: json.dumps(v).encode(‘utf-8‘))        producer.send(‘test‘, json.dumps(data), partition=0)        producer.close()        return case_id

    def service(self):        """ 消费者取消息,执行,service应该单独部署到其他服务器,专门用于执行接口、UI、性能测试 """        print("进入取数据模式")        consumer = KafkaConsumer(‘test‘, bootstrap_servers=[‘localhost:9092‘])        print("创建卡夫卡消费者成功")        for msg in consumer:            print("迭代取数据开始")            recv = "%s:%d:%d: key=%s value=%s" % \                   (msg.topic, msg.partition, msg.offset, msg.key, msg.value)            print(recv)

            # 解析成python.json对象            data = json.loads(msg.value)            data = json.loads(data)

            # 拿前端传过来的数据            code = data.get(‘code‘)  # 拿代码            host = data.get(‘host‘)  # 拿host            user = data.get(‘user‘, 10)  # 拿用户数            rate = data.get(‘rate‘, 10)  # 拿QPS(每秒请求数)            time = data.get(‘time‘, 60)  # 拿请求时间

            # 把代码写到locustfile.py文件里面            file = open(‘locustfile.py‘, "w", encoding="utf-8")            file.write(code)            file.close()

            # 拼凑并运行locust指令,noweb模式,并生成csv报告            cmd = "locust -f locustfile.py --host={0} --no-web -c {1} -r {2} -t {3} --csv=report".format(host, user, rate, time)            os.system(cmd)

            # 解析report_requests.csv文件里的测试的数据写            report_id = get_case_id()            file = open(‘report_requests.csv‘)            lines = csv.reader(file, delimiter=‘ ‘, quotechar=‘|‘)            result = {                ‘_id‘: report_id,                ‘requests‘: []            }            for line in lines:                result[‘requests‘].append(line)            file.close()

            # 把数据写入到MongoDB            Mongo().insert("2019", "performance", result)

if __name__ == ‘__main__‘:    logic = Logic()    logic.service()

原文地址:https://www.cnblogs.com/zhongyehai/p/11286074.html

时间: 2024-10-25 14:46:49

测开之路七十九:性能测试蓝图之执行逻辑的相关文章

测开之路七十七:性能测试蓝图之js

//定义全局的editor = nullvar editor = null; //ace_editor的初始化函数function ace_editor() { var editor = ace.edit("editor"); //初始化对象,"editor"为前端页面的id //设置风格和语言(更多风格和语言,请到github上相应目录查看) //editor.setTheme("ace/theme/clouds"); //编辑界面的主题--云

测开之路七十六:性能测试蓝图之html

<!-- 继承base模板 -->{% extends 'base.html' %} {% block script %} <!-- 从cdn引入ace edter的js --> <script src="https://cdn.bootcss.com/ace/1.4.5/ace.js"></script> <script src="https://cdn.bootcss.com/ace/1.4.5/mode-python

测开之路七十八:性能测试蓝图之视图层

from flask import requestfrom flask import jsonifyfrom flask import Blueprintfrom flask import render_templatefrom performance.logic import Logic performance = Blueprint('performance', __name__, static_folder='static', template_folder='templates', ur

测开之路四十九:用Django实现扑克牌游戏

用Django实现和之前flask一样的扑克牌游戏 项目结构 html <!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>扎金花</title></head><body> {% for poker in player1 %} <img src="/static/poker

测开之路七十四:python处理kafka

kafka-python地址:https://github.com/dpkp/kafka-python 安装kafka-python:pip install kafka-python 接收消息 from kafka import KafkaConsumerconsumer = KafkaConsumer('test', bootstrap_servers=['localhost:9092']) # 定义消费者for msg in consumer: print(msg) 发条消息尝试接收,结果是

测开之路七十六:linux变量和环境变量

变量 赋值 variable=0,访问 $var或${var} 参数 $n 用``引住的会先执行(~键) 位置参数 环境变量/etc/profile:全局的环境变量 . bash_profile:用户的环境变量,在哪个用户的home下,就对哪个用户生效~/ .bashrc:用户的环境变量,在哪个用户的home下,就对哪个用户生效export:导出变量,即用户在外面使用source:修改环境变量后让变量文件生效 添加a到环境变量 也可以export其他的变量 在脚本中使用环境变量 管道 | 管道操

测开之路五十二:蓝图的用法

目录结构 html <!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>蓝图渲染</title></head><body><h1>这里是蓝图渲染</h1></body></html> 子app(创建不同的蓝图,如接口测试.ui测试.性能测试)

测开之路二十五:彩票游戏

玩法 import random money = random.randint(0, 999) def generate_code(money): return list(set([ money[0] + money[1] + money[2], money[0]+money[2]+money[1], money[1] + money[0] + money[2], money[1] + money[2] + money[0], money[2] + money[0] + money[1], mo

测开之路四十八:Django之重定向与cookie

基础配置与上一篇一致 404错误 定义一个error页面 <!DOCTYPE html><html lang="en"><head> <meta charset="UTF-8"> <title>错误页</title></head><body> <h1>哎呀,出错啦!</h1></body></html> 追加一个404的视图