1.介绍
RocketMq是一个纯java、分布式、队列模型的的开源的消息中间件,具有以下特点
1.能够保证严格的消息顺序
2.提供丰富的消息拉取模式
3.高效的消息订阅机制
4.实时的消息订阅机制
5.亿级消息的堆积能力
2.安装(以虚拟机参考)
RocketMq是java实现的,因此安装的前提必须有java环境,配置好jdk环境,在此就不多说了
把下载好的alibaba-rocketmq-3.1.1.tar.gz上传到linux服务器,解压:
tar -zxvf alibaba-rocketmq-3.1.1.tar.gz
提升操作的权限 chmod +x ./alibaba-rocketmq/bin/*
由于我虚拟机的内存比较小,因此在运行过程中会报内存的异常信息,因此需要修改RocketMq启动时的虚拟机参数配置
vi ./alibaba-rocketmq/bin/runserver.sh #nameserver 内存
vi ./alibaba-rocketmq/bin/runbroker.sh #broke内存
JAVA_OPT_1="-server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"(参考你自己的机器内存)
以上就是配置好的RocketMq的相关信息,下面开始启动RocketMq
启动nameserver:nohup ./bin/mqnamesrv >/dev/nameserver.log 2>&1 & #默认端口9876
关闭nameserver:./bin/mqshutdown namesrv
启动mqbroker :nohup ./bin/mqbroker -n 100.66.51.152:9876 >/dev/broker.log 2>&1 & #默认端口10911(100.66.51.152:9876为nameserver,链接进行注册)
关闭mqbroker :./bin/mqshutdown broker
3.概念介绍
下面来通过看上图来了解一下RocketMq中的基本组件
上图中的Consumer和Producer是属于client组件模块中的,主要面对的是开发的模块。主要提供了consumer订阅消息和producer发布消息。
broker:是每个RocketMQ中最核心的部分,该组件提供消息的存储和分发。用于producer存储消息和consumer中订阅该存储中的消息。
namesrv:是一个注册中心,每个broker启动则将会将字节的信息发布到namesrv,发布到namesrv的信息包括broker提供的信息。那么client启动的时候,就可以将自己所需要的topic信息向namesrv订阅,然后namesrv通过存储的broker获得信息,直接返回给client端。
实例讲解
下面通过一个官方的实例开运行一下,上面搭建好的RocketMq
生产者
/** * @FileName: Producer.java * @Package:com.test * @Description: TODO * @author: LUCKY * @date:2015年12月28日 下午2:32:22 * @version V1.0 */ package com.test; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; /** * @ClassName: Producer * @Description: 模拟生产者 * @author: LUCKY * @date:2015年12月28日 下午2:32:22 */ public class Producer1 { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("Producer"); // 必须要设置nameserver地址 producer.setNamesrvAddr("100.66.154.81:9876"); try { producer.start(); for(long i=0l;i<3;i++){ Message msg = new Message("topic"+i, "push"+i, "1", ("第"+i+"内容").getBytes()); SendResult result = producer.send(msg); System.out.println(result); } } catch (Exception e) { e.printStackTrace(); } finally { } } }
消费者
/** * @FileName: Consumer.java * @Package:com.test * @Description: TODO * @author: LUCKY * @date:2015年12月28日 下午2:43:23 * @version V1.0 */ package com.test; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageExt; /** * @ClassName: Consumer * @Description: 模拟消费者 * @author: LUCKY * @date:2015年12月28日 下午2:43:23 */ public class Consumer4 { public static void main(String[] args) { DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a"); consumer.setNamesrvAddr("100.66.154.81:9876"); try { // 订阅PushTopic下Tag为push的消息,都订阅消息 consumer.subscribe("TopicTest", "TagA"); // 程序第一次启动从消息队列头获取数据 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // msgs中只收集同一个topic,同一个tag,并且key相同的message // 会把不同的消息分别放置到不同的队列中 for(Message msg:msgs){ System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.resume(); consumer.start(); } catch (Exception e) { e.printStackTrace(); } } }