RocketMQ 4.5.1 单机环境搭建以及生产消费测试

为了学习和方便测试,总是要启动一个单机版的。下载 http://rocketmq.apache.org/dowloading/releases/

1. 要先配置环境变量

ROCKETMQ_HOME

E:\rocketmq-all-4.5.1-bin-release

2. 然后进入bin目录启动NameServer

3. 启动Broker

启动

E:\rocketmq-all-4.5.1-bin-release\bin>mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true

可能会出现一个错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_161\lib;C:\Program

解决方法:(打开bin目录的runserver.cmd

修改成

重新启动,成功

4. 弄个管控台方便查看

https://github.com/apache/rocketmq-externals

下载好进入 rocketmq-console 目录打包

mvn clean package -Dmaven.test.skip=true

进入target目录,启动 (最后的参数的nameserver的地址,也就是我本机地址)

E:\rocketmq-externals-master\rocketmq-console\target>java -jar rocketmq-console-ng-1.0.1.jar --rocketmq.config.namesrvAddr=localhost:9876

最后访问 http://localhost:8080 即可

5. 简单测试

引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>

一个简单的生产者

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;

public class Test {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 设置生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("my_producer_group");
        // 设置NameServer地址
        producer.setNamesrvAddr("10.204.241.15:9876");
        // 启动
        producer.start();
        for (int i = 0; i < 10; i++) {
            // 创建一条消息,包含topic、tag以及消息内容
            Message msg = new Message("MyTopic", "MyTag", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            // 发送结果
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        // 不用的时候关闭
        producer.shutdown();
    }

}

查看管控台

查看详细

下面一个简单的消费者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class Test2 {

    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        // 设置生产者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_producer_group");
        // 设置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅的主题
        consumer.subscribe("MyTopic", "*");
        // 注册消息监听
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }

}

控制台输出

不要关闭消费者,查看管控台

原文地址:https://www.cnblogs.com/LUA123/p/11023753.html

时间: 2024-10-27 17:20:22

RocketMQ 4.5.1 单机环境搭建以及生产消费测试的相关文章

kafka单机环境搭建

1,准备工作:  windows下虚拟机上安装centos7,下载putty工具. 2,windows下下载相关的安装包 jdk-8u71-linux-x64.rpm kafka_2.11-0.9.0.0.tgz zookeeper-3.4.6.tar.gz 3,通过putty的pscp工具将上述3个文件上传到centos的/home/xf/backup目录(自己指定目录) 4,启动一个putty窗口连接centos,rpm方式安装jdk-8u71-linux-x64.rpm 5,解压kafka

[转载] Hadoop和Hive单机环境搭建

转载自http://blog.csdn.net/yfkiss/article/details/7715476和http://blog.csdn.net/yfkiss/article/details/7721329 下载hadoophadoop下载地址:http://www.apache.org/dyn/closer.cgi/hadoop/core/这里下载的版本是1.0.3$ mkdir hadoop$ wget http://www.fayea.com/apache-mirror/hadoop

kafka单机环境搭建及其基本使用

最近在搞kettle整合kafka producer插件,于是自己搭建了一套单机的kafka环境,以便用于测试.现整理如下的笔记,发上来和大家分享.后续还会有kafka的研究笔记,依然会与大家分享! 1 kafka环境搭建 1.1 kafka单机环境搭建 (1).解压kafka_2.11-1.1.0.tgz,得到"kafka_2.11-1.1.0"文件夹. (2).kafka需要安装zookee使用,但kafka集成zookeeper,在单机搭建时可直接使用.使用需配置kafka_2.

spark单机环境搭建以及快速入门

1 单机环境搭建 系统环境 cat /etc/centos-release CentOS Linux release 7.3.1611 (Core) 配置jdk8 wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "http://download.

App自动化测试探索(二)MAC环境搭建iOS+Python+Appium测试环境

环境搭建要求,MAC 机器一台,要求 Xcode 8.0以上 1. 安装 Homebrew /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)" 2. 安装 libimobiledevice brew install libimobiledevice 3. 安装 ideviceinstaller brew install ideviceinst

Hadoop-2.6.0分布式单机环境搭建HDFS讲解Mapreduce示例

Hadoop安装使用 1.1 Hadoop简介 1.2 HDFS分布式存储系统 1.3 单机安装 1.4 Mapreduce 案例 1.5 伪分布式安装 1.6 课后作业 1.1 Hadoop简介 在文章的时候已经讲解了Hadoop的简介以及生态圈,有什么不懂的可以"出门右转" http://dwz.cn/4rdSdU 1.2 HDFS分布式存储系统(Hadoop Distributed File System) HDFS优点 高容错性 数据自动保存多个副本 副本都时候会自动恢复 适合

hadoop2.6.4【ubuntu】单机环境搭建 系列1

jdk安装 tar zxvf jdk mv jdk /usr/lib/jvm/java jdk环境变量配置 vim /etc/profile ``` export JAVA_HOME=/usr/lib/java export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=${PATH}:${JAVA_HOME}/bin:${JRE_HOME}/bin export

大数据测试之hadoop单机环境搭建(超级详细版)

友情提示:本文超级长,请备好瓜子 Hadoop的运行模式 单机模式是Hadoop的默认模式,在该模式下无需任何守护进程,所有程序都在单个JVM上运行,该模式主要用于开发和调试mapreduce的应用逻辑: 伪分布式模式下,Hadoop守护进程运行在一台机器上,模拟一个小规模的集群.该模式在单机模式的基础上增加了代码调试的功能,允许你检查NameNode,DataNode,Jobtracker,Tasktracker等模拟节点的运行情况: 单机模式和伪分布式模式均用于开发和调试的目的,真实Hado

HBase单机环境搭建

1.安装环境 ubuntu 14.04 server hbase-1.0.1 JDK1.7 2.安装JDK并配置环境变量 参考   JAVA笔记整理(二),下载安装JDK 3.安装HBase 3.1.解压缩HBase sudo tar -zxvf hbase-1.0.1-bin.tar.gz 3.2.重命名hbase-1.0.1为hbase1.0 sudo mv hbase-1.0.1/ hbase1.0 3.3.修改HBase配置文件,位于hbase目录下conf文件夹 hbase-env.s