记录一次在阿里云ECS服务器部署验证RocketMQ的经历

==背景==

购买了3台阿里云ECS服务器,上面部署了RocketMQ,用来作为业务后台与平台之间的数据通讯中间件。

部署倒是异常顺利,不过在本地写程序,测试生产和消费数据的时候,出现了一些问题。

耗费了将近1天的时间,终于解决了,记录一下本次排查的经历。

==环境==

Linux:CentOS8(阿里云ECS服务器)

RocketMQ:4.6.1

==集群==

节点数:3个

节点1:broker-a(master)

节点2:broker-a(slave),broker-b(master)

节点3:broker-b(slave)

配置文件如下(IP地址省略了):

broker-a.properties

brokerClusterName=rexel
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10921
brokerIP1=xx.xx.xx.01
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/radmin/data/rocketmq/rootdir-a-m
storePathCommitLog=/home/radmin/data/rocketmq/commitlog-a-m
storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-a-m
storePathIndex=/home/radmin/data/rocketmq/index-a-m
storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-a-m

broker-a-s.properties

brokerClusterName=rexel
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10931
brokerIP1=xx.xx.xx.02
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/radmin/data/rocketmq/rootdir-a-s
storePathCommitLog=/home/radmin/data/rocketmq/commitlog-a-s
storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-a-s
storePathIndex=/home/radmin/data/rocketmq/index-a-s
storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-a-s

broker-b.properties

brokerClusterName=rexel
brokerName=broker-b
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=SYNC_MASTER
flushDiskType=ASYNC_FLUSH
listenPort=10921
brokerIP1=xx.xx.xx.02
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/radmin/data/rocketmq/rootdir-b-m
storePathCommitLog=/home/radmin/data/rocketmq/commitlog-b-m
storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-b-m
storePathIndex=/home/radmin/data/rocketmq/index-b-m
storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-b-m

broker-b-s.properties

brokerClusterName=rexel
brokerName=broker-b
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH
listenPort=10931
brokerIP1=xx.xx.xx.03
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876
defaultTopicQueueNums=4
autoCreateTopicEnable=false
autoCreateSubscriptionGroup=true
storePathRootDir=/home/radmin/data/rocketmq/rootdir-b-s
storePathCommitLog=/home/radmin/data/rocketmq/commitlog-b-s
storePathConsumerQueue=/home/radmin/data/rocketmq/consumequeue-b-s
storePathIndex=/home/radmin/data/rocketmq/index-b-s
storeCheckpoint=/home/radmin/data/rocketmq/checkpoint-b-s

==最终代码==

RocketUtils.java

package com.rexel.stream.common.utils;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.exception.RemotingException;

public class RocketUtils implements Serializable{
    private static RocketUtils rocketUtils = null;
    private static Map<String, DefaultMQProducer> nameSrvMap = null;

    private RocketUtils() {

    }

    public synchronized static RocketUtils getInstance() {
        if (rocketUtils == null) {
            synchronized (RocketUtils.class) {
                rocketUtils = new RocketUtils();
            }
        }
        nameSrvMap = new HashMap<>();
        return rocketUtils;
    }

    public DefaultMQPushConsumer createConsumer(String namesrvAddr, String topic, String group) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setInstanceName(UUID.randomUUID().toString());
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setVipChannelEnabled(false);
        consumer.setConsumeThreadMax(1);
        consumer.setConsumeThreadMin(1);
        consumer.setConsumeMessageBatchMaxSize(1);
        try {
            consumer.subscribe(topic, "*");
        } catch (MQClientException e) {
            e.printStackTrace();
            return null;
        }
        return consumer;
    }

    public DefaultMQProducer createProducer(String nameSrvAddr, String group) {
        if (nameSrvMap == null) {
            return null;
        }

        if (nameSrvMap.containsKey(nameSrvAddr)) {
            return nameSrvMap.get(nameSrvAddr);
        }

        DefaultMQProducer producer = new DefaultMQProducer(group);
        producer.setNamesrvAddr(nameSrvAddr);
        producer.setSendMessageWithVIPChannel(false);
        producer.setSendMsgTimeout(5000);
        producer.setInstanceName(UUID.randomUUID().toString());
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
            return null;
        }

        nameSrvMap.put(nameSrvAddr, producer);
        return producer;
    }

    public boolean sendOr(DefaultMQProducer producer, Message msg, boolean async) {
        if (async) {
            return sendAsync(producer, msg);
        } else {
            return send(producer, msg);
        }
    }

    public boolean sendAsync(DefaultMQProducer producer, Message msg) {
        try {
            producer.send(msg, new CallBack());
            return true;
        } catch (MQClientException | RemotingException | InterruptedException e) {
            e.printStackTrace();
            return false;
        }
    }

    public boolean send(DefaultMQProducer producer, Message msg) {
        try {
            producer.send(msg);
            return true;
        } catch (MQClientException | RemotingException | InterruptedException | MQBrokerException e) {
            e.printStackTrace();
            return false;
        }
    }

    private class CallBack implements SendCallback,Serializable{
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("[------]onSuccess");
        }

        @Override
        public void onException(Throwable throwable) {
            System.out.println("[------]onException. " + throwable.getMessage());
        }
    }
}

RmqProducer.java

package com.rexel.stream.tools;

import com.alibaba.fastjson.JSONObject;
import com.rexel.stream.common.utils.RocketUtils;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

public class RmqProducer {
    public static void main(String[] args) {
        System.out.println("[------]start.");
        RocketUtils rocketUtils = RocketUtils.getInstance();
        DefaultMQProducer producer =
            rocketUtils.createProducer("xx.xx.xx.01:9876;xx.xx.xx.02:9876", "pro_test3");

        for (int i = 0; i < 10; i++) {
            JSONObject jsonObject = new JSONObject();
            jsonObject.put("name", "VA_2YC_VAL");
            jsonObject.put("judge", "≥");
            jsonObject.put("value", "100");
            rocketUtils.sendAsync(producer, new Message(
                "app_notice",
                jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)));
        }

        //如果使用异步发送,这里不要shutdown
//        producer.shutdown();
        System.out.println("[------]end.");
    }
}

RmqConsumer.java

package com.rexel.stream.tools;

import com.rexel.stream.common.utils.RocketUtils;
import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.Message;

public class RmqConsumer {
    public static void main(String[] args) throws MQClientException {
        System.out.println("[------]start.");
        RocketUtils rocketUtils = RocketUtils.getInstance();
        DefaultMQPushConsumer consumer = rocketUtils.createConsumer(
            "xx.xx.xx.01:9876;xx.xx.xx.02:9876",
            "app_notice",
            "rexel_stream3");
        consumer.registerMessageListener(
            (MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
                for (Message msg : list) {
                    try {
                        byte[] body = msg.getBody();
                        String message = new String(body, StandardCharsets.UTF_8);
                        System.out.println("[------]rmq message= " + message);
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
        consumer.start();

        System.out.println("[------]end.");
    }
}

==问题1==

配置完成之后,尝试在客户端编写生产者代码,结果生产数据的时候报错。

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [xxxx] failed

尝试1:

把生产者和消费者的代码中增加setSendMessageWithVIPChannel(false)。

结果:依然报错,错误没有改变

尝试2:

在配置文件中增加brokerIP1=xx.xx.xx.xx的配置。

结果:依然报错,错误没有改变

尝试3:

网上说是防火墙的问题,服务器本身的防火墙很早就已经被关闭了。尝试去设置阿里云ECS服务器产品的端口。

一次性的把一个10900/10999的端口全部开放

结果:测试同步生产数据正常。

尝试4:

测试异步生产数据。调用RocketUtils中的sendAsync方法。结果报错:

[------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. The producer service state not OK, SHUTDOWN_ALREADY
See http://rocketmq.apache.org/docs/faq/ for further details.
[------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed
[------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed
[------]onException. org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to [101.132.242.90:9876, 47.116.50.192:9876] failed

原因是使用异步生产数据的时候,我的程序里调用了shutdown方法,

导致后续的异步线程无法正常执行。注释掉shutdown处理之后,异步生产正常。

结论:

如果出现connect to [xxxx] failed的问题,不外乎尝试以下几种办法:

1、程序中:生产者或者消费者:setSendMessageWithVIPChannel(false)

2、配置文件:如果是阿里云ECS服务器,以下两个配置使用外网地址:

brokerIP1=xx.xx.xx.01
namesrvAddr=xx.xx.xx.01:9876;xx.xx.xx.02:9876

3、防火墙:关闭服务器本身的防火墙。

4、安全组:阿里云服务器本身的网络安全组中需要开通端口。

==问题2==

生产者已经没有问题了,但是消费者一直消费不到数据。程序不报任何错误,就是消费不到数据。

在网上找了一些到有的博客,其中这篇给了我一些方向,

http://www.jiangxinlingdu.com/rocketmq/2019/08/06/noconsumer.html

初步怀疑是消费者的偏移量有问题。

解决办法:

我这个环境由于是新搭环境,目前还不是生产环境,所以我直接采用的方式是:

1、停止rocketmq集群

2、删除所有rocketmq的文件

3、重启集群

4、重新创建topic

一套暴利连招之后,消费者果然可以消费到数据了。

索然没有真正的找到问题的原因,不过基本上可以确定是rocketmq的元数据出现了问题,

这个问题的产生可能是我最近不断的调试配置文件,修改内外网地址,重启引起的。

--END--

原文地址:https://www.cnblogs.com/quchunhui/p/12516583.html

时间: 2024-10-09 20:00:18

记录一次在阿里云ECS服务器部署验证RocketMQ的经历的相关文章

阿里云ECS服务器部署django

参考 服务器安装的是Centos 系统. uwsgi是使用pip安装的. nginx是使用yum install nginx安装. python 2.7, mysql 5.5使用 yum安装. 它们之间的逻辑关系如下: the web client <-> the web server <-> the socket <-> uwsgi <-> Django uswgi负责从Django拿内容,通过socket传给 web server如nginx, 最后显示

阿里云ECS服务器部署HADOOP集群(五):Pig 安装

本篇将在阿里云ECS服务器部署HADOOP集群(一):Hadoop完全分布式集群环境搭建的基础上搭建. 1 环境介绍 一台阿里云ECS服务器:master 操作系统:CentOS 7.3 Hadoop:hadoop-2.7.3.tar.gz Java: jdk-8u77-linux-x64.tar.gz Pig: pig-0.17.0.tar.gz 2 Pig 下载 下载 pig-0.17.0.tar.gz 并在合适的位置解压缩,笔者这里解压缩的路径为: /usr/local 将解压得到的目录改

阿里云ECS服务器部署HADOOP集群(四):Hive本地模式的安装

本篇将在阿里云ECS服务器部署HADOOP集群(一):Hadoop完全分布式集群环境搭建的基础上搭建. 本地模式需要采用MySQL数据库存储数据. 1 环境介绍 一台阿里云ECS服务器:master 操作系统:CentOS 7.3 Hadoop:hadoop-2.7.3.tar.gz Java: jdk-8u77-linux-x64.tar.gz Hive:apache-hive-2.3.6-bin.tar.gz Mysql: Mysql 5.7 MySQL Connector-J:mysql-

阿里云ECS服务器部署HADOOP集群(七):Sqoop 安装

本篇将在 阿里云ECS服务器部署HADOOP集群(一):Hadoop完全分布式集群环境搭建 阿里云ECS服务器部署HADOOP集群(二):HBase完全分布式集群搭建(使用外置ZooKeeper) 阿里云ECS服务器部署HADOOP集群(三):ZooKeeper 完全分布式集群搭建 阿里云ECS服务器部署HADOOP集群(四):Hive本地模式的安装 的基础上搭建. 1 环境介绍 一台阿里云ECS服务器:master 操作系统:CentOS 7.3 Hadoop:hadoop-2.7.3.tar

阿里云ECS服务器部署HADOOP集群(六):Flume 安装

本篇将在阿里云ECS服务器部署HADOOP集群(一):Hadoop完全分布式集群环境搭建的基础上搭建. 1 环境介绍 一台阿里云ECS服务器:master 操作系统:CentOS 7.3 Hadoop:hadoop-2.7.3.tar.gz Java: jdk-8u77-linux-x64.tar.gz Flume:apache-flume-1.8.0-bin.tar.gz 2 Flume 下载 下载 apache-flume-1.8.0-bin.tar.gz 并在合适的位置解压缩,笔者这里解压

阿里云ECS服务器部署HADOOP集群(三):ZooKeeper 完全分布式集群搭建

本篇将在阿里云ECS服务器部署HADOOP集群(一):Hadoop完全分布式集群环境搭建的基础上搭建,多添加了一个 datanode 节点 . 1 节点环境介绍: 1.1 环境介绍: 服务器:三台阿里云ECS服务器:master, slave1, slave2 操作系统:CentOS 7.3 Hadoop:hadoop-2.7.3.tar.gz Java: jdk-8u77-linux-x64.tar.gz ZooKeeper: zookeeper-3.4.14.tar.gz 1.2 各节点角色

阿里云ECS服务器部署HADOOP集群(一):Hadoop完全分布式集群环境搭建

准备: 两台配置CentOS 7.3的阿里云ECS服务器: hadoop-2.7.3.tar.gz安装包: jdk-8u77-linux-x64.tar.gz安装包: hostname及IP的配置: 更改主机名: 由于系统为CentOS 7,可以直接使用‘hostnamectl set-hostname 主机名’来修改,修改完毕后重新shell登录或者重启服务器即可. 1 hostnamectl set-hostname master 2 exit 3 ssh [email protected]

阿里云ECS服务器的搭建

之前写了一个Android小项目,然后里面各种与后台数据库的交互,然后差不多完成了吧!感觉应该买一个服务器,而不是每次都是需要启动MyEclipse,启动Tomcat服务器才能够启动服务,获取到数据.那么这次就讲一下阿里云ECS服务器搭建的流程吧! 1. 去阿里云的官网购买服务器,菜单栏的"产品"-->"弹性计算"-->"云服务器ECS" 2. 进去里面购买,分为包年包月 与 按量付费 两种模式,其实价格对于学生党来说还是不算便宜的.

阿里云ECS服务器Linux环境下配置php服务器(二)--phpMyAdmin篇

首先说明,以下文本内容用vim编辑麻烦 可参考阿里云ECS服务器Linux环境下配置php服务器(一)--基础配置篇 这一次我们来继续说说phpMyAdmin的安装. 什么是phpMyAdmin?phpMyAdmin是一种mysql的管理工具,它可以直接通过网页来管理你的MySQL,当然,phpMyAdmin不是必要的,如果你不安装phpMyAdmin,一样可以通过mysql的命令行来管理你的mysql. 开始安装. 首先找到phpMyAdmin的下载地址(推荐官网地址https://www.p