RocketMq牛刀小试

1.介绍

RocketMq是一个纯java、分布式、队列模型的的开源的消息中间件,具有以下特点

1.能够保证严格的消息顺序

2.提供丰富的消息拉取模式

3.高效的消息订阅机制

4.实时的消息订阅机制

5.亿级消息的堆积能力

2.安装(以虚拟机参考)

RocketMq是java实现的,因此安装的前提必须有java环境,配置好jdk环境,在此就不多说了

把下载好的alibaba-rocketmq-3.1.1.tar.gz上传到linux服务器,解压:

tar -zxvf alibaba-rocketmq-3.1.1.tar.gz

提升操作的权限 chmod +x ./alibaba-rocketmq/bin/*

由于我虚拟机的内存比较小,因此在运行过程中会报内存的异常信息,因此需要修改RocketMq启动时的虚拟机参数配置

vi  ./alibaba-rocketmq/bin/runserver.sh   #nameserver 内存

vi  ./alibaba-rocketmq/bin/runbroker.sh  #broke内存

JAVA_OPT_1="-server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=64m -XX:MaxPermSize=128m"(参考你自己的机器内存)

以上就是配置好的RocketMq的相关信息,下面开始启动RocketMq

启动nameserver:nohup ./bin/mqnamesrv >/dev/nameserver.log 2>&1 &         #默认端口9876

关闭nameserver:./bin/mqshutdown namesrv

启动mqbroker :nohup ./bin/mqbroker -n 100.66.51.152:9876 >/dev/broker.log 2>&1 &     #默认端口10911(100.66.51.152:9876为nameserver,链接进行注册)

关闭mqbroker :./bin/mqshutdown broker

3.概念介绍

下面来通过看上图来了解一下RocketMq中的基本组件

上图中的Consumer和Producer是属于client组件模块中的,主要面对的是开发的模块。主要提供了consumer订阅消息和producer发布消息。

broker:是每个RocketMQ中最核心的部分,该组件提供消息的存储和分发。用于producer存储消息和consumer中订阅该存储中的消息。

namesrv:是一个注册中心,每个broker启动则将会将字节的信息发布到namesrv,发布到namesrv的信息包括broker提供的信息。那么client启动的时候,就可以将自己所需要的topic信息向namesrv订阅,然后namesrv通过存储的broker获得信息,直接返回给client端。

实例讲解

下面通过一个官方的实例开运行一下,上面搭建好的RocketMq

生产者

/**
 * @FileName: Producer.java
 * @Package:com.test
 * @Description: TODO
 * @author: LUCKY
 * @date:2015年12月28日 下午2:32:22
 * @version V1.0
 */
package com.test;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

/**
 * @ClassName: Producer
 * @Description: 模拟生产者
 * @author: LUCKY
 * @date:2015年12月28日 下午2:32:22
 */
public class Producer1 {
	public static void main(String[] args) throws Exception {

		DefaultMQProducer producer = new DefaultMQProducer("Producer");
		// 必须要设置nameserver地址
		producer.setNamesrvAddr("100.66.154.81:9876");
		try {
			producer.start();
			for(long i=0l;i<3;i++){
				Message msg = new Message("topic"+i, "push"+i, "1",
						("第"+i+"内容").getBytes());
				SendResult result = producer.send(msg);
				System.out.println(result);
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {

		}
	}

}

消费者

/**
 * @FileName: Consumer.java
 * @Package:com.test
 * @Description: TODO
 * @author: LUCKY
 * @date:2015年12月28日 下午2:43:23
 * @version V1.0
 */
package com.test;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * @ClassName: Consumer
 * @Description: 模拟消费者
 * @author: LUCKY
 * @date:2015年12月28日 下午2:43:23
 */
public class Consumer4 {

	public static void main(String[] args) {
		DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("broker-a");
		consumer.setNamesrvAddr("100.66.154.81:9876");
		try {
			// 订阅PushTopic下Tag为push的消息,都订阅消息
			consumer.subscribe("TopicTest", "TagA");

			// 程序第一次启动从消息队列头获取数据
			consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
			consumer.registerMessageListener(new MessageListenerConcurrently() {

				public ConsumeConcurrentlyStatus consumeMessage(
						List<MessageExt> msgs,
						ConsumeConcurrentlyContext context) {
					// msgs中只收集同一个topic,同一个tag,并且key相同的message
					// 会把不同的消息分别放置到不同的队列中
					for(Message msg:msgs){

						System.out.println(new String(msg.getBody()));
					}

					return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
				}
			});

			consumer.resume();
			consumer.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

时间: 2024-12-21 04:29:20

RocketMq牛刀小试的相关文章

分布式开放消息系统(RocketMQ)的原理与实践

分布式消息系统作为实现分布式系统可扩展.可伸缩性的关键组件,需要具有高吞吐量.高可用等特点.而谈到消息系统的设计,就回避不了两个问题: 消息的顺序问题 消息的重复问题 RocketMQ作为阿里开源的一款高性能.高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的? 关键特性以及其实现原理 一.顺序消息 消息有序指的是可以按照消息的发送顺序来消费.例如:一笔订单产生了 3 条消息,分别是订单创建.订单付款.订单完成.消费时,要按照顺序依次消费才有意

rocketmq 命令示例

http://www.360doc.com/content/16/0111/17/1073512_527143896.shtml http://www.cnblogs.com/marcotan/p/4256857.html RocketMQ常用命令 二.根据msgId查询消息 1.文档: 指令 queryMsgById 类路径 com.alibaba.rocketmq.tools.command.message.QueryMsgByIdSubCommand 参数 是否必填 说明 -i 是 msg

rocketmq安装与基本操作

如果不是因为政治原因,就rocketmq的社区活跃度.版本.特性和文档完善度,我是无论如何也不会使用rocketmq的. rocketmq严格意义上并不支持高可靠性,因为其持久化只支持异步,有另外一个线程flush,不支持配置同步刷新到磁盘.只能说多个节点宕机的概率很低很低,外加现在的服务器一般都是UPS. rocketmq官方提供了一份与activemq,kafka的特性对比(但没有包括与rabbitmq的比较).引用如下: Messaging Product Client SDK Proto

50.RocketMQ (quickstart)

1.订阅消息 /** * Copyright (C) 2010-2013 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

rocketmq源码分析3-consumer消息获取

使用rocketmq的大体消息发送过程如下: 在前面已经分析过MQ的broker接收生产者客户端发过来的消息的过程,此文主要讲述订阅者获取消息的过程,或者说broker是怎样将消息传递给消费者客户端的,即上面时序图中拉取消息(pull message)动作.. 1. 如何找到入口(MQ-broker端) 分析一个机制或者功能时,我们首先希望的是找到入口,前一篇我们是通过端口号方式顺藤摸瓜的方式找到了入口.但是此篇略微不同,涉及到consumer客户端与broker的两边分析,最终发现逻辑还是比较

RocketMQ源码学习--消息存储篇

1.序言 今天来和大家探讨一下RocketMQ在消息存储方面所作出的努力,在介绍RocketMQ的存储模型之前,可以先探讨一下MQ的存储模型选择. 2.MQ的存储模型选择 个人看来,从MQ的类型来看,存储模型分两种: 需要持久化(ActiveMQ,RabbitMQ,Kafka,RocketMQ) 不需要持久化(ZeroMQ) 本篇文章主要讨论持久化MQ的存储模型,因为现在大多数的MQ都是支持持久化存储,而且业务上也大多需要MQ有持久存储的能力,能大大增加系统的高可用性,下面几种存储方式: 分布式

rocketMq

1. 下载安装包,解压 https://github.com/alibaba/RocketMQ 2. cmd 3. 启动 mqnameserv C:\Users\Administrator>cd D:\download\alibaba-rocketmq\bin C:\Users\Administrator>mqnamesrv.exe -n 127.0.0.1:9876;192.168.0.1:9876 'mqnamesrv.exe' 不是内部或外部命令,也不是可运行的程序 或批处理文件. C:

转:Kafka、RabbitMQ、RocketMQ消息中间件的对比 —— 消息发送性能 (阿里中间件团队博客)

from: http://jm.taobao.org/2016/04/01/kafka-vs-rabbitmq-vs-rocketmq-message-send-performance/ 引言 分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦.现在开源的消息中间件有很多,前段时间我们自家的产品 RocketMQ (MetaQ的内核) 也顺利开源,得到大家的关注. 那么,消息中间件性能究竟哪家强? 带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka.RabbitM

RocketMQ与Kafka对比(18项差异)

转自:https://github.com/alibaba/RocketMQ/wiki/rmq_vs_kafka 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011年初,Linkin开源了Kafka这个优秀的消息中间件,淘宝中间件团队在对Kafka做过充分Review之后,Kafka无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在