JAVA代码之RocketMQ生产和消费数据

一、启动RocketMQ

[[email protected] ~]# cat /etc/hosts

# Do not remove the following line, or various programs

# that require network functionality will fail.

127.0.0.1               localhost.localdomain localhost

::1             localhost6.localdomain6 localhost6

192.168.1.106  node1

192.168.1.103  master

192.168.1.110  node2

[[email protected] ~]# cd /opt/alibaba-rocketmq/bin/

[[email protected] bin]# cat play.sh

#!/bin/sh

#

# Name Server

#

nohup sh mqnamesrv > ns.log 2>&1 &

#

# Service Addr

#

ADDR=`hostname -i`:9876

#

# Broker

#

nohup sh mqbroker -n ${ADDR} > bk.log 2>&1 &

echo "Start Name Server and Broker Successfully, ${ADDR}"

[[email protected] bin]# sh play.sh

Start Name Server and Broker Successfully, 192.168.1.103:9876

[[email protected] bin]# sh mqadmin topicList -n 192.168.1.103:9876

BenchmarkTest

DefaultCluster

SELF_TEST_TOPIC

%RETRY%please_rename_unique_group_name_4

TBW102

gaojingsong

master

OFFSET_MOVED_EVENT

[[email protected] bin]# cd ../

备注:此时topic不存在,但是生产数据的时候会自动创建

二、生产和消费数据

生产:下载

package cn.cn.mq.demo;

import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

public class Producer {

public static void main(String[] args) throws MQClientException,

InterruptedException{

/**

* 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>

* 注意:ProducerGroupName需要由应用来保证唯一<br>

* ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,

* 因为服务器会回查这个Group下的任意一个Producer

*/

final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

producer.setNamesrvAddr("192.168.1.103:9876");

producer.setInstanceName("Producer");

/**

* Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>

* 注意:切记不可以在每次发送消息时,都调用start方法

*/

producer.start();

/**

* 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。

* 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>

* 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>

* 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。

*/

for (int i = 0; i < 3; i++){

try {

{

Message msg = new Message("TopicTest1",// topic

"TagA",// tag

"OrderID001",// key

("我的名字是程序员:"+i).getBytes());// body

SendResult sendResult = producer.send(msg);

System.out.println(sendResult);

}

{

Message msg = new Message("TopicTest1",// topic

"TagC",// tag

"OrderID001",// key

("我来测试RocketMQ:"+i).getBytes());// body

SendResult sendResult = producer.send(msg);

System.out.println(sendResult);

}

}catch(Exception e) {

e.printStackTrace();

}

TimeUnit.MILLISECONDS.sleep(4000);

}

/**

* 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己

* 注意:我们建议应用在JBOSS、Tomcat等容器的退出钩子里调用shutdown方法

*/

//    producer.shutdown();

Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

public void run() {

producer.shutdown();

}

}));

System.exit(0);

}

}

消费:下载

package cn.cn.mq.demo;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageExt;

public class PushConsumer {

/**

* 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>

* 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>

*/

public static void main(String[] args) throws InterruptedException,

MQClientException {

/**

* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>

* 注意:ConsumerGroupName需要由应用来保证唯一

*/

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(

"ConsumerGroupName");

consumer.setNamesrvAddr("192.168.1.103:9876");

consumer.setInstanceName("Consumber");

/**

* 订阅指定topic下tags分别等于TagA或TagC或TagD

*/

consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

/**

* 订阅指定topic下所有消息<br>

* 注意:一个consumer对象可以订阅多个topic

*/

//consumer.subscribe("TopicTest2", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(

List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

System.out.println(Thread.currentThread().getName()

+ " Receive New Messages: " + msgs.size());

MessageExt msg = msgs.get(0);

if (msg.getTopic().equals("TopicTest1")) {

// 执行TopicTest1的消费逻辑

if (msg.getTags() != null // 执行TagA的消费

&& msg.getTags().equals("TagA")) {

System.out.println("TagA:"+new String(msg.getBody()));

} else if (msg.getTags() != null// 执行TagC的消费

&& msg.getTags().equals("TagC")) {

System.out.println("TagC:"+new String(msg.getBody()));

} else if (msg.getTags() != null// 执行TagD的消费

&& msg.getTags().equals("TagD")) {

System.out.println("TagD:"+new String(msg.getBody()));

}

}

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

/**

* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>

*/

consumer.start();

System.out.println("ConsumerStarted.");

}

}

三、验证消费结果  下载

[[email protected] bin]# sh mqadmin topicList -n 192.168.1.103:9876

BenchmarkTest

TopicTest1

DefaultCluster

SELF_TEST_TOPIC

%RETRY%please_rename_unique_group_name_4

%RETRY%ConsumerGroupName

TBW102

gaojingsong

master

OFFSET_MOVED_EVENT

[[email protected] bin]# sh mqadmin  topicStatus -n  192.168.1.103:9876 -t TopicTest1

#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated

master                            0     0                     4                       2016-10-20 14:38:19,236

master                            1     0                     4                       2016-10-20 14:38:19,243

master                            2     0                     2                       2016-10-20 14:38:15,171

master                            3     0                     2                       2016-10-20 14:38:15,180

[[email protected] bin]# shutdown -h now

消费数据

四、错误解决方案 下载

时间: 2024-08-03 20:36:03

JAVA代码之RocketMQ生产和消费数据的相关文章

Kafka 使用Java实现数据的生产和消费demo

前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据. Kafka 有如下特性: 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间复杂度的访问性能. 高吞吐率.即使在非常廉价的商用机器上也能做到单机支持每秒100K条以上消息的传输. 支持Kafka Serv

spark streaming 接收 kafka 数据java代码WordCount示例

1. 首先启动zookeeper 2. 启动kafka 3. 核心代码 生产者生产消息的java代码,生成要统计的单词 package streaming; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyProducer { pu

Java代码实现excel数据导入到Oracle

1.首先需要两个jar包jxl.jar,ojdbc.jar(注意版本,版本不合适会报版本错误)2.代码: Java代码   import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import jxl.Cell; import jxl.Sheet; import jxl.Workbook; import jxl.read.biff.Bi

java代码实现highchart与数据库数据结合完整案例分析(一)---饼状图

作者原创:转载请注明出处 在做项目的过程中,经常会用到统计数据,同时会用到highchart或echart进行数据展示,highchart是外国开发的数据统计图插件, echart是我们国家开发的数据统计插件,我比较喜欢highchart的统计插件,在这里展示的也是highchart插件的应用. 应用highchart插件并不难,找到官方文档,copy代码,就能把图标呈现出来,难的是如何将本地数据库中的数据与其结合.因此, 在这里主要分析将数据库数据和插件结合的过程,我用的是java代码实现的.

使用solrj查询数据(java代码)

实体类Student,添加Field注解 package com.cs.solr.entity; import org.apache.solr.client.solrj.beans.Field; public class Student { @Field("id") private Integer id; @Field("name") private String name; @Field("gender") private String gen

Java多线程学习笔记--生产消费者模式

实际开发中,我们经常会接触到生产消费者模型,如:Android的Looper相应handler处理UI操作,Socket通信的响应过程.数据缓冲区在文件读写应用等.强大的模型框架,鉴于本人水平有限目前水平只能膜拜,本次只能算学习笔记,为了巩固自己对Java多线程常规知识点的理解,路过大神还望能指导指导.下面一段代码是最常规的生产者消费者的例子: package com.zhanglei.demo; import java.util.ArrayList; import java.util.List

Java的多线程实现生产/消费模式

Java的多线程实现生产/消费模式 在Java的多线程中,我们经常使用某个Java对象的wait(),notify()以及notifyAll() 方法实现多线程的通讯,今天就使用Java的多线程实现生产/消费模式,需求如下: 线程A ProductThread 继承Thread 实现生产数据 若线程共享的数据不为NULL,则生产线程进入等待状态 线程B CustomThread 继承Thread 实现消费数据(输出到控制台) 当线程共享数据为NULL的时候,进入等待状态 线程B 消费完数据之后,

RocketMQ的顺序消费和事务消费

一.三种消费 :1.普通消费 2. 顺序消费 3.事务消费 1.1  顺序消费:在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一.创建订单 ,第二:订单付款,第三:订单完成.也就是这个三个环节要有顺序,这个订单才有意义.RocketMQ可以保证顺序消费,他的实现是生产者(一个生产者可以对多个主题去发送消息)将这个三个消息放在topic(一个topic默认有4个队列)的一个队列里面,单机支持上万个持久化队列,消费端去消费的时候也是只能有一个Consumer去取得这个队列里面的数据,然后

Kettle变量和自定义java代码的实例应用

1  kettle.properties参数配置数据源连接和FTP连接 由于测试环境和生产环境中数据库连接FTP等配置会在部署过程中变更,所以预先定义成配置项,在配置文件中修改,这样测试和发布将会变得简单,下面以数据库为例说明这类配置的使用. (1)      首先要找到配置文件,不同的操作系统路径也不一样,本人用win7进行开发,配置文件的路径为"C:\Users\chenpeng\.kettle\kettle.properties",如下: (2)      配置文件中的具体配置如