阿里云RocketMQ的性能测试(一、本地测试)

因业务需要,需要进行阿里云RocketMQ的性能测试。

环境,一台windows系统CPU:I7,内存:8G,64位操作系统。

测试两种场景,为了保证订阅关系一致性(可以去阿里云官网了解订阅关系一致性),消费分为两种模式。

1、按Tag订阅,订阅所有Tag,测试使用3个消费者,3个生产者,每个生产者发送一万条数据,放到同一个Tag里,对应同一个ShardingKey,保证顺序消费。

测试结果:

2、按Tag订阅,每个消费者订阅一个Tag,每个Tag在一个分组里面。

测试结果:

其中要特别注意第二条,订阅关系一致性问题,如果3个消费者在一个组内,订阅的tag不一致,消费是有问题的,也可能就不消费。

结论:

TPS不到100,这个基于本地到阿里云,走的公网,效果一般,后面会有基于阿里云内网的测试,敬请期待。

发送消息代码:

public static void sendMqMessage( String topic, String tag, String message, String sharding ) {

        String key = UUID.randomUUID().toString();
        Message msg = new Message(
                // Message 所属的 Topic
                topic,
                // Message Tag, 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤
                tag,
                // Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                message.getBytes()
        );
        // 设置代表消息的业务关键属性,请尽可能全局唯一。
        // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
        // 注意:不设置也不会影响消息正常收发
        msg.setKey(key);
        // 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。
        // 全局顺序消息,该字段可以设置为任意非空字符串。
        String shardingKey = sharding;
        try {
            SendResult sendResult = producer.send(msg, shardingKey);
            // 发送消息,只要不抛异常就是成功
            if (sendResult != null) {
                //System.out.println(message + tag + sharding);
                if(message.equals("10000")){
                    System.out.println(tag + ":发送完毕10000!");
                }
                //SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                //System.out.println(dateFormat.format(new Date()) + "-发送消息成功-sharding:" + shardingKey + ",tag:" + tag + ",key:"+ key + ",msgId:" + sendResult.getMessageId());
            }
        }
        catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            throw e;
        }
    }

消费代码:

public void consumerMqMessage() {
        String topic = "topic-test";
        String tags = "W3";//第一种模式使用*
        consumer = RocketMqConsumerSingleton.getInstance();
        // 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
        consumer.subscribe(
            // Message 所属的 Topic
            topic,
            // 订阅指定 Topic 下的 Tags:
            // 1. * 表示订阅所有消息
            // 2. TagA || TagB || TagC 表示订阅 TagA 或 TagB 或 TagC 的消息
            tags,
            new MessageOrderListener() {
                /**
                 * 1. 消息消费处理失败或者处理出现异常,返回 OrderAction.Suspend<br>
                 * 2. 消息处理成功,返回 OrderAction.Success
                 */
                @Override
                public OrderAction consume(Message message, ConsumeOrderContext context) {
                    String msg = new String(message.getBody());
                    //System.out.println(msg);

                    if(msg.equals("1")) {
                        System.out.println(message.getTag() + "开始:" + System.currentTimeMillis());
                    }
                    if(msg.equals("10000")) {
                        System.out.println(message.getTag() + "结束:" + System.currentTimeMillis());
                    }
                    //SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    //System.out.println(dateFormat.format(new Date()) + "-消费消息---sharding:" + message.getShardingKey() + ",tag:" + message.getTag() + ",key: " + message.getKey() + ",MsgId:" + message.getMsgID());
                    try {
                        //Thread.sleep(2000);
                        //System.out.println("-------------消费者睡2秒后----------");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return OrderAction.Success;
                }
            });
        consumer.start();
    }

原文地址:https://blog.51cto.com/janephp/2408070

时间: 2024-10-10 12:55:24

阿里云RocketMQ的性能测试(一、本地测试)的相关文章

阿里云ECS安装sqlserver,本地无法连接问题排查思路

1. 阿里云控制台-对应的ECS实例的安全组是否添加了响应的端口(1433)可以访问: 2. 服务器-sqlserver服务是否开启: 3. 服务器-sqlserver配置器,对应的端口是否启用,已经是否处于侦听状态: (netstat -a -n 或者netstat -an查看本地端口侦听情况) 登录远程SQL服务器一 看ping 服务器IP能否ping通. 这个实际上是看和远程sql server 2000服务器的物理连接是否存在.如果不行,请检查网络,查看配置,当然得确保远程sql ser

使用mysqldump命令从阿里云备份数据库数据至本地

注:因为需要从阿里云云服务器备份数据库数据至本地(个人认为如非迁移服务器,此项操作显得有些多余) 又注:谢谢阿里云的帮助文档 1, 安装mysql命令行客户端 2,打开命令行提示符窗口,输入并执行如下命令: mysqldump -hhmd-021.my3w.com -uhdm0215246 -phdmxx00101 --default-character-set=utf8 bdm0254685_db > e:\backup\bdm0388542_db_utf8.sql

【阿里云产品公测】rds测试感受

阿里云用户:cncbase 公司于10.1决定改变原来的服务器自建数据库,使用rds.于近日开通rds,进行了一些测试. 信息量:500字节左右每条信息,约200万条信息/小时的吞吐量.     信息实时性要求较高,实时写入信息约每秒峰值150条左右,信息写入以后,由另一台服务器上运行的服务端检索各种需要的信息以后下发到app.     公司原来的构架是自建sqlserver ,经常出现各种问题.维护人员一般不是专业的开发人员,所以非常头疼.     rds已经比较成熟,在1个月的测试结束以后,

阿里云OSS上传文件本地调试跨域问题解决

问题描述: 最近后台说为了提高上传效率,要前端直接上传文件到阿里云,而不经过后台.因为在阿里云服务器设置的允许源(region)为某个固定的域名下的源(例如*.cheche.com),直接在本地访问会有跨域问题. 解决方案: 在本机C:\Windows\System32\drivers\etc的hosts文件中(使用管理员身份打开并编辑)添加一行地址映射:127.0.0.1 test.cheche.com 然后把前端运行项目的端口改为80,以vue项目为例(config/index.js) 到这

基于阿里云RocketMQ的分片顺序消费+监听器自动启动的Springboot实验

发送消息RocketMqProducerService package com.jane.rocketmq.service; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.order.OrderProducer; import org.springframework.s

阿里云安装zk并连接javaAPI测试

1.安装 可参照Ubuntu 搭建Zookeeper服务进行安装并启动. 2.注意 阿里云环境开放2181端口 2.1 查看已开放端口: firewall-cmd --permanent --zone=public --list-ports 2.2 永久的添加该端口.去掉--permanent则表示临时. firewall-cmd --permanent --zone=public --add-port=2181/tcp 2.3 加载配置,使得修改有效 firewall-cmd --reload

阿里云RocketMQ的消费者简单实现

业务场景之类的请看另一篇生产者的实现: package com.ttt.eee; import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.Consumer; import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.PropertyKeyConst; impor

【阿里云产品公测】PTS测试 SLB+ECS+RDS组合的DZ论坛负载极限压力,100并发2000页

环境介绍: \)6?u_(u   1.ECS:1核 1G 5M 杭州 1>O0Iu   2.RDS:240M  5G  杭州内网 >5z`SZf   3.SLB:私网实例 %/,Uk+3p   V'|g 配置测试环境: V1+o3g{}   *IfIRR>3l(   w]}cB+C+l# 测试脚本: (^OC%pc   1.生成参数文件,我的方法是利用工具生成的sitemap.txt  2000条网址. @5nkI$>3z   去掉http://xxx.xx.xxx/得到参数文件.

阿里云ECS VSFTP上传本地文件

开始终端 购买云服务,获得公网IP,内网IP 控制台首页获得 获得终端连接密码 连接终端,输入连接密码 获得终端界面,进入终端 上传文件 ## 1.安装VSFTP服务器程序 yum install vsftpd ## 2.vi /etc/vsftpd/vsftpd.conf listen=YES write_enable=YES // 允许系统用户上传数据 connect_timeout=60 // 超时断开连接 anonymous_enable=NO // 禁用匿名登录 local_enabl