RocketMQ初入门踩坑记

本文主要是讲在Centos中安装RocketMQ并做简单的示例。如果你按照本文安装100%是可以成功的,如果按照阿里官方的说明,那只能呵呵了~

安装

官方地址为:https://rocketmq.apache.org/docs/quick-start/
本人安装如下:

//下载最新的rocketmq
wget http://apache-mirror.8birdsvideo.com/rocketmq/4.4.0/rocketmq-all-4.4.0-bin-release.zip

//解压
unzip rocketmq-all-4.4.0-bin-release.zip

//切换到mq目录
cd rocketmq-all-4.4.0-bin-release

//name server 启动
nohup ./bin/mqnamesrv -n 111.231.XX.XX:9876 &

//-c conf/broker.conf autoCreateTopicEnable=true 参数需要带上,不然topic需要手动创建
nohup sh bin/mqbroker -n 111.231.XX.XX:9876 -c conf/broker.conf autoCreateTopicEnable=true &

配置,切换到mq的bin目录下

cd rocketmq-all-4.4.0-bin-release/bin

rocketmq默认最低内存为4g,机器内存不够用的话,找到runserver.sh和runbroker.sh编辑如下:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

运行

运行官方demo,发现如下错误:

21:20:22.249 [NettyClientSelector_1] INFO  RocketmqRemoting - closeChannel: close the connection to remote address[] result: true
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:640)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1310)
    at org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1256)
    at org.apache.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:339)
    at org.apache.rocketmq.example.simple.Producer.main(Producer.java:40)

运行以下命令查看broker配置并写入远程ip地址:

//查看broker配置
sh ./bin/mqbroker -m

//关闭broker
sh bin/mqshutdown broker

//将本机远程ip写入配置文件中
echo 'brokerIP1=111.231.XX.XX' > conf/broker.properties 

//重新启动broker
nohup sh bin/mqbroker -n 111.231.XX.XX:9876 -c conf/broker.conf autoCreateTopicEnable=true &

管理控制台安装

Git地址:https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

git clone [email protected]:apache/rocketmq-externals.git
cd  rocketmq-external/rocketmq-console/
mvn clean package -Dmaven.test.skip=true

打完包后,运行以下命令

java -jar rocketmq-console-ng-1.0.1.jar --server.port=12181 --rocketmq.config.namesrvAddr=111.231.XX.XX:9876

打开 http://localhost:12181访问控制台,像如下

在Procuder这个页面查询时会出现如下异常:

java.lang.RuntimeException: org.apache.rocketmq.client.exception.MQBrokerException: CODE: 1  DESC: the producer group[] not exist
For more information, please visit the url, http://rocketmq.apache.org/docs/faq/
        at com.google.common.base.Throwables.propagate(Throwables.java:160)
        at org.apache.rocketmq.console.service.impl.ProducerServiceImpl.getProducerConnection(ProducerServiceImpl.java:38)
        at org.apache.rocketmq.console.controller.ProducerController.producerConnection(ProducerController.java:39)

请把代码中producer.shutdown()这句注掉,生产环境中请加上。

 //producer.shutdown();

代码示例(官方)

生产者

package org.apache.rocketmq.example.simple;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

        producer.setNamesrvAddr("111.231.XX.XX:9876");
        producer.start();

        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        //producer.shutdown();
    }
}

消费者

package org.apache.rocketmq.example.simple;

import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        //consumer.setConsumeTimestamp("20181109221800");
        consumer.setNamesrvAddr("111.231.XX.XX:9876");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

有更多的文章,请关注查看,更有面试宝典相送

原文地址:https://www.cnblogs.com/pingyun/p/11629616.html

时间: 2024-08-06 01:44:47

RocketMQ初入门踩坑记的相关文章

&lt;&lt;Python编程:从入门到实践&gt;&gt;踩坑记 Django

<<Python编程:从入门到实践>>踩坑记 Django Django Python 19.1.1.5 模板new_topic 做完书上的步骤后,对主题添加页面经行测试,但是浏览器显示 服务器异常. 个人采用的开发环境是virtual studio code , 测试起来很是难受,因为我配置的debug环境,断点操作没有作用. 经过我不断的测试,才发现我失败的原因是由于之前的误操作,先建立new_pizzas.py后改为new_pizzas.html的,错误就在这里.在我之后新建

Spark踩坑记——数据库(Hbase+Mysql)转

转自:http://www.cnblogs.com/xlturing/p/spark.html 前言 在使用Spark Streaming的过程中对于计算产生结果的进行持久化时,我们往往需要操作数据库,去统计或者改变一些值.最近一个实时消费者处理任务,在使用spark streaming进行实时的数据流处理时,我需要将计算好的数据更新到hbase和mysql中,所以本文对spark操作hbase和mysql的内容进行总结,并且对自己踩到的一些坑进行记录. Spark Streaming持久化设计

踩坑记:httpComponents 的 EntityUtils

今天写的一个服务程序,有人报告获得的数据中文乱码,而我是用 apache 通过 httpComponents 去取得数据的,于是开启日志的 debug 级别. 在日志里果然发现中文不见了,有乱码出现: 2014-07-02 16:35:01.348 DEBUG [Wire.java:86] http-outgoing-8 << "<?xml version="1.0" encoding="UTF-8"?>... subject=&q

iOS开发-OpenGLES 入门踩坑

Flat coloring(单色) 是通知OpenGL使用单一的颜色来渲染,OpenGL将一直使用指定的颜色来渲染直到你指定其它的颜色. 指定颜色的方法为 public abstract void glColor4f(float red, float green, float blue, float alpha). 缺省的red,green,blue为1,代表白色. Smooth coloring (平滑颜色过渡) 当给每个顶点定义一个颜色时,OpenGL自动为不同顶点颜色之间生成中间过渡颜色(

Spring @Transactional踩坑记

@Transactional踩坑记 总述 ? Spring在1.2引入@Transactional注解, 该注解的引入使得我们可以简单地通过在方法或者类上添加@Transactional注解,实现事务控制. 然而看起来越是简单的东西,背后的实现可能存在很多默认规则和限制.而对于使用者如果只知道使用该注解,而不去考虑背后的限制,就可能事与愿违,到时候线上出了问题可能根本都找不出啥原因. 踩坑记 1. 多数据源 事务不生效 背景介绍 ? 由于数据量比较大,项目的初始设计是分库分表的.于是在配置文件中

[转]Spark 踩坑记:数据库(Hbase+Mysql)

https://cloud.tencent.com/developer/article/1004820 Spark 踩坑记:数据库(Hbase+Mysql) 前言 在使用Spark Streaming的过程中对于计算产生结果的进行持久化时,我们往往需要操作数据库,去统计或者改变一些值. 最近一个实时消费者处理任务,在使用spark streaming进行实时的数据流处理时,我需要将计算好的数据更新到hbase和mysql中,所以本文对spark操作hbase和mysql的内容进行总结,并且对自己

记一次 Spring 事务配置踩坑记

记一次 Spring 事务配置踩坑记 问题描述:(SpringBoot + MyBatisPlus) 业务逻辑伪代码如下.理论上,插入数据 t1 后,xxService.getXxx() 方法的查询条件会不满足,会查询不到数据.结果事与愿违,后一次的查询,居然查到了数据. void saveXxx(){  xxService.getXxx(); // 查到一条数据 data1  xxService.insert(); // 插入一条数据 t1  xxService.getXxx(); // 查到

阿里云ECS搭建Kubernetes集群踩坑记

阿里云ECS搭建Kubernetes集群踩坑记 [TOC] 1. 现有环境.资源 资源 数量 规格 EIP 1 5M带宽 ECS 3 2 vCPU 16 GB内存 100G硬盘 ECS 3 2 vCPU 16 GB内存 150G硬盘 SLB 2 私网slb.s1.small 2. 规划 坑: 上网问题,因为只有一个EIP,所有其它节点只能通过代理上网; 负载均衡问题,因为阿里不支持LVS,负载均衡TCP方式后端又不支持访问负载均衡,HTTP和HTTPS方式,只支持后端协议为HTTP; 为了避免上

HttpWebRequest 改为 HttpClient 踩坑记-请求头设置

HttpWebRequest 改为 HttpClient 踩坑记-请求头设置 Intro 这两天改了一个项目,原来的项目是.net framework 项目,里面处理 HTTP 请求使用的是 WebReauest,但是 WebRequest 已经不再推荐使用了,你如果在项目中使用的话,编译器会警告, WebRequest已过时,新项目要 .Net standard 重写就直接 HttpClient 来处理 HTTP 请求了,在改的过程中踩了几个坑,记录一下 请求头处理 HttpClient 通常