Rocket MQ 问题排查命令

修改rocketmq官方代码测试:

package com.alibaba.middleware.race.rocketmq;
import java.util.Scanner;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
/**
 * Producer,模拟rocket mq使用中可能出现的问题,学习如何排查q问题
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException, RemotingException {
        DefaultMQProducer producer = new DefaultMQProducer("procedure_group_name");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        final String topics = "TOPIC-IT-WORKER-TEST";
        for (int i = 0; i < 1000; i++) {
            @SuppressWarnings("resource")
            Scanner reader=new Scanner(System.in);
            int key = reader.nextInt();
            final String message = " order-message - " + i + " key: " + key;
            byte[] body = message.getBytes();
            Message msgToBroker = new Message(topics, "tag-push", String.valueOf(key), body);
            producer.send(msgToBroker, new SendCallback() {
                public void onSuccess(SendResult sendResult) {
                    System.out.println(message);
                }
                public void onException(Throwable throwable) {
                    throwable.printStackTrace();
                }
            });
        }
    }
}
package com.alibaba.middleware.race.rocketmq;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Scanner;

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group_name");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.subscribe("TOPIC-IT-WORKER-TEST", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    Scanner reader=new Scanner(System.in);
                    reader.hasNext();
                    byte[] body = msg.getBody();
                    if (body.length == 2 && body[0] == 0 && body[1] == 0) {
                        System.out.println("Got the end signal");
                        continue;
                    }
                    String paymentMessage = new String(body);
                    System.out.println(paymentMessage + " key: " + msg.getKeys() + " tag: " + msg.getTags());
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Started.");
    }
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.alibaba.race</groupId>
    <artifactId>preliminary.demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.3</version>
                <configuration>
                    <appendAssemblyId>false</appendAssemblyId>
                    <descriptors>
                        <descriptor>src/main/resources/assembly.xml</descriptor>
                    </descriptors>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>install</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

<!--    com.alibaba.middleware.race.jstorm-2.1.1版本默认的日志框架是logback,为了避免日志冲突,排除掉log4j-->
    <dependencies>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.2.6</version>
        </dependency>
    </dependencies>
</project>
编译后启动服务端和客户端
进入target目录
启动生产者生产数据:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Producer
启动消费者消费数据:java -Drocketmq.namesrv.addr=127.0.0.1:9876 -cp preliminary.demo-1.0-SNAPSHOT.jar com.alibaba.middleware.race.rocketmq.Consumer

启动&参数修改

mqnamesrv   启动NameServer  jps - NamesrvStartup
mqbroker -n localhost:9876 启动broker jps - BrokerStartup 默认端口10911
mqadmin updateBrokerConfig -c DefaultCluster -n 127.0.0.1:9876 -k listenPort -v 10911 更新broker参数配置

查看当前系统状态

mqadmin clusterList -n 127.0.0.1:9876

查看当前所有topicList

mqadmin topicList -n 127.0.0.1:9876

查看broker状态

mqadmin brokerStatus -n 127.0.0.1:9876 -b 127.0.0.1:10911

查看某个topic的状态

mqadmin topicStatus -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST当前可见,producer只发送了一条消息,Max offset为1,最后收到消息的时间是last updated,由于配置四个broker都是本机,只有第一个收到了当前第一条消息第二张图为发了四条消息之后的状态,看起来可能就是轮询的,因为当我增加4条key为1的msg之后,仍然是四个节点每个两条

查看连接的procedure/consumer

mqadmin producerConnection -n 127.0.0.1:9876 -g procedure_group_name -t TOPIC-IT-WORKER-TEST
mqadmin consumerConnection -n 127.0.0.1:9876 -g consumer_group_name

查看某个key对应的msg

mqadmin queryMsgByKey -n 127.0.0.1:9876 -t TOPIC-IT-WORKER-TEST-1 -k 1因为之前发送了5条key为1的数据,所以这里可以看到是5条,每条都有一个MESSAGE ID

根据ID查看对应的MSG

mqadmin queryMsgById  -g consumer_group_name -i AC1F78B700002A9F00000000000A3208  -n 127.0.0.1:9876

根据位置偏移查询上面的那条数据

 mqadmin queryMsgByOffset -n  127.0.0.1:9876 -o 1 -t TOPIC-IT-WORKER-TEST-1 -i 1 -b izm5e210z0uiwyavdbmpxaz

查看消费详情

mqadmin consumerProgress -n 127.0.0.1:9876 -g consumer_group_name这里消费了一条,一共八条,差7条没有消费

重置消费端offset

mqadmin resetOffsetByTime  -n 127.0.0.1:9876 -g consumer_group_name -t TOPIC-IT-WORKER-TEST-1 -f true -s 1536820000

直接打印消息

mqadmin printMsg -t TOPIC-IT-WORKER-TEST-1 -n 127.0.0.1:9876

The most commonly used mqadmin commands are:
   updateTopic          Update or create topic
   deleteTopic          Delete topic from broker and NameServer.
   updateSubGroup       Update or create subscription group
   deleteSubGroup       Delete subscription group from broker.
   updateBrokerConfig   Update broker‘s config
   updateTopicPerm      Update topic perm
   topicRoute           Examine topic route info
   topicStatus          Examine topic Status info
   topicClusterList     get cluster info for topic
   brokerStatus         Fetch broker runtime status data
   queryMsgById         Query Message by Id
   queryMsgByKey        Query Message by Key
   queryMsgByUniqueKey  Query Message by Unique key
   queryMsgByOffset     Query Message by offset
   queryMsgByUniqueKey  Query Message by Unique key
   printMsg             Print Message Detail
   printMsgByQueue      Print Message Detail
   sendMsgStatus        send msg to broker.
   brokerConsumeStats   Fetch broker consume stats data
   producerConnection   Query producer‘s socket connection and client version
   consumerConnection   Query consumer‘s socket connection, client version and subscription
   consumerProgress     Query consumers‘s progress, speed
   consumerStatus       Query consumer‘s internal data structure
   cloneGroupOffset     clone offset from other group.
   clusterList          List all of clusters
   topicList            Fetch all topic list from name server
   updateKvConfig       Create or update KV config.
   deleteKvConfig       Delete KV config.
   wipeWritePerm        Wipe write perm of broker in all name server
   resetOffsetByTime    Reset consumer offset by timestamp(without client restart).
   updateOrderConf      Create or update or delete order conf
   cleanExpiredCQ       Clean expired ConsumeQueue on broker.
   cleanUnusedTopic     Clean unused topic on broker.
   startMonitoring      Start Monitoring
   statsAll             Topic and Consumer tps stats
   allocateMQ           Allocate MQ
   checkMsgSendRT       check message send response time
   clusterRT            List All clusters Message Send RT
   getNamesrvConfig     Get configs of name server.
   updateNamesrvConfig  Update configs of name server.
   getBrokerConfig      Get broker config by cluster or special broker!
   queryCq              Query cq command.

原文地址:https://www.cnblogs.com/it-worker365/p/9641791.html

时间: 2024-08-29 21:04:14

Rocket MQ 问题排查命令的相关文章

系统性能排查命令及优化思路

最近笔者经常处理了一些线上的问题机器.特抽空写一篇文章将处理系统性能问题和优化思路进行总结,方便后续工作中系统故障的排查.作为运维,收到网管系统性能报警应该是常有的事情.而快速进行问题定位并解决则是工作的关键.我们在排查或者优化一个系统的时候无外乎从以下几个方面考虑: 1.CPU的使用率异常,如某个核心的使用率过高,而其它核心则处于空闲的状况. 2.内存的使用状况:通常需要注意是否程序内存泄漏导致的. 3.磁盘IO性能:通常磁盘IO不正常时我们需要判断程序是否处在顺序读写的状态. 4.网络IO负

Java 开发必须掌握的线上问题排查命令

作为一个合格的开发人员,不仅要能写得一手还代码,还有一项很重要的技能就是排查问题.这里提到的排查问题不仅仅是在coding的过程中debug等,还包括的就是线上问题的排查.由于在生产环境中,一般没办法debug(其实有些问题,debug也白扯...),所以我们需要借助一些常用命令来查看运行时的具体情况,这些运行时信息包括但不限于运行日志.异常堆栈.堆使用情况.GC情况.JVM参数情况.线程情况等. 给一个系统定位问题的时候,知识.经验是关键,数据是依据,工具是运用知识处理数据的手段.为了便于我们

Java开发必须掌握的线上问题排查命令

作为一个合格的开发人员,不仅要能写得一手还代码,还有一项很重要的技能就是排查问题.这里提到的排查问题不仅仅是在coding的过程中debug等,还包括的就是线上问题的排查.由于在生产环境中,一般没办法debug(其实有些问题,debug也白扯...),所以我们需要借助一些常用命令来查看运行时的具体情况,这些运行时信息包括但不限于运行日志.异常堆栈.堆使用情况.GC情况.JVM参数情况.线程情况等. 给一个系统定位问题的时候,知识.经验是关键,数据是依据,工具是运用知识处理数据的手段.为了便于我们

线上问题排查命令----JVM篇

创建: 刘新宇,最新修改: 大约3小时以前 转至元数据起始 0.分类 功能 命令 线程 jstack 内存 jmap 性能 jstat 1.查找到Jvm的进程id,以后所有命令基本都需要 jps # 附带jvm参数信息 jps -v # 只显示id jps -q #输出main method的参数 jps -m #输出完全的包名,应用主类名,jar的完全路径 jps -l # 向下个命令输出pid jps | grep 'Bootstrap' | awk '{print $1}' | 2.jst

线上问题排查命令----Shell篇

创建: 刘新宇,最新修改: 大约4小时以前 转至元数据起始 1.跟踪线上日志定时刷新最新内容 tail -fn 200 $log 2.查找指定字符串 #只显示匹配行 grep $String $file #匹配字符串所在行的上下n行 grep -C n $String $file #匹配字符串的行数有多少  grep $String $file | wc -l #匹配字符串高亮显示 grep --color $String $file #使用正则 grep -E $String $file 3.

性能排查命令总结

找到最耗CPU的java线程 ps命令 命令: ps -mp pid -o THREAD,tid,time 或者 ps -Lfp pid 结果展示: 这个命令的作用,主要是可以获取到对应一个进程下的线程的一些信息. 比如你想分析一下一个java进程的一些运行瓶颈点,可以通过该命令找到所有当前Thread的占用CPU的时间,也就是这里的最后一列. 比如这里找到了一个TID : 30834 ,所占用的TIME时间最高. 通过 printf "%x\n" 30834 首先转化成16进制, 继

linux环境网络问题排查命令

1,ping ping 对方域名 ping 对方ip. ping 对方网关. ping 自己网关. ping www.sina.com 2,telnet ip port 3, dig . dig @dns 域名 dig +trace 域名

Linux系统下信息或者和故障排查命令小结

1.查看用户在线和登录情况 w - 查看在线用户 last - 查看最近用户登录情况 2.查看系统命令执行历史 history 3.查看系统现在运行的进程信息 pstree -a (简单结果) ps aux (详细结果) 4.查看网络连接信息 netstat -ntlp netstat -nulp netstat -nxlp 5.查看CPU和内存运行信息 free -m # 查看当前内存使用情况 uptime # 查看系统运行时长 top # 查看系统进程信息(动态) 6.查看硬件信息 lspc

Rocket MQ 1 - 用

参考 http://www.iocoder.cn/categories/RocketMQ/ ; https://www.jianshu.com/nb/16219849 首先上启动方法,分别启动namesrv/broker/procedure/consumer public static void main(String[] args) throws Exception { NettyServerConfig nettyServerConfig = new NettyServerConfig();