SpringBoot2.X 整合 RocketMQ4.X

开发生产者代码

第一步:创建很普通的 SpringBoot 项目

第二步:加入相关依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.3.0</version>
</dependency>

第三步:写代码

PayProducer 类如下所示:

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

@Component
public class PayProducer {

    private String producerGroup = "pay_group";

    private String nameServerAddr = "192.168.0.104:9876";

    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);

        //指定NameServer地址,多个地址以 ; 隔开
        //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");

        producer.setNamesrvAddr(nameServerAddr);
        start();
    }

    public DefaultMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 对象在使用之前必须要调用一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 一般在应用上下文,使用上下文监听器,进行关闭
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

PayController 类如下所示:

package net.xdclass.xdclassmq.controller;

import net.xdclass.xdclassmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RestController
public class PayController {

    @Autowired
    private PayProducer payProducer;

    private static  final String topic = "pay_test_topic";

    @RequestMapping("/api/v1/pay_cb")
    public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
        Message message = new Message(topic,"taga", ("hello rocketmq = "+text).getBytes() );
        SendResult sendResult = payProducer.getProducer().send(message);
        System.out.println(sendResult);
        return new HashMap<>();
    }
}

第四步:测试

通过可视化管理后台查看消息

Message对象

  • topic:主题名称
  • tag:标签,用于过滤
  • key:消息唯一标示,可以是业务字段组合
  • body:消息体,字节数组

注意:发送消息到 Broker 前,需要判断是否有此 Topic。启动 Broker 的时候,本地环境建议开启自动创建 Topic,生产环境建议关闭自动化创建 Topic。建议先手工创建 Topic,如果靠程序自动创建,然后再投递消息,会出现延迟情况。自动创建topic: autoCreateTopicEnable=true 无效原因:客户端版本要和服务端版本保持一致。

概念模型: 一个 Topic 下面对应多个 Queue,可以在创建 Topic 时指定,如订单类 Topic。

常见错误一

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
原因:阿里云存在多网卡,rocketmq都会根据当前网卡选择一个IP使用,当你的机器有多块网卡时,很有可能会有问题。比如,我遇到的问题是我机器上有两个IP,一个公网IP,一个私网IP, 因此需要配置broker.conf 指定当前的公网ip, 然后重新启动broker
新增配置:conf/broker.conf  (属性名称brokerIP1=broker所在的公网ip地址 )
新增这个配置:brokerIP1=120.76.62.13  

启动命令:nohup sh bin/mqbroker -n localhost:9876  -c ./conf/broker.conf &

常见错误二

MQClientException: No route info of this topic, TopicTest1
原因:Broker 禁止自动创建 Topic,且用户没有通过手工方式创建 此Topic, 或者broker和Nameserver网络不通
解决:
通过 sh bin/mqbroker -m  查看配置
autoCreateTopicEnable=true 则自动创建topic

Centos7关闭防火墙  systemctl stop firewalld

常见错误三

控制台查看不了数据,提示连接 10909错误

原因:Rocket默认开启了VIP通道,VIP通道端口为10911-2=10909

解决:阿里云安全组需要增加一个端口 10909

其他错误:

https://blog.csdn.net/qq_14853889/article/details/81053145
https://blog.csdn.net/wangmx1993328/article/details/81588217#%E5%BC%82%E5%B8%B8%E8%AF%B4%E6%98%8E
https://www.jianshu.com/p/bfd6d849f156
https://blog.csdn.net/wangmx1993328/article/details/81588217

开发消费者代码

接着上面的工程,直接上代码,PayConsumer 类如下所示:

package net.xdclass.xdclassmq.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

@Component
public class PayConsumer {
    private DefaultMQPushConsumer consumer;

    private String CONSUMER_GROUP = "pay_consumer_group";
    private String NAME_SERVER = "192.168.0.104:9876";
    private String TOPIC = "pay_test_topic";

    public PayConsumer() throws MQClientException {

        consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
        consumer.setNamesrvAddr(this.NAME_SERVER);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe(this.TOPIC, "*");

//        consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
//            try {
//                Message msg = msgs.get(0);
//                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
//                String topic = msg.getTopic();
//                String body = new String(msg.getBody(), "utf-8");
//                String tags = msg.getTags();
//                String keys = msg.getKeys();
//                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            } catch (UnsupportedEncodingException e) {
//                e.printStackTrace();
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//            }
//        });

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    Message msg = msgs.get(0);
                    System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));

                    String topic = msg.getTopic();
                    String body = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    String keys = msg.getKeys();
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);

                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (UnsupportedEncodingException e) {

                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });

        consumer.start();
        System.out.println("consumer start ...");
    }
}

注释掉的部分采用 Lambda 表达式写法,效果是一样的。

常见问题

1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed 

2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]

3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [Book-Air.local, 	MacBook-Air.local, MacBook-Air.local]
解决:多网卡问题处理
	1、设置producer:  producer.setVipChannelEnabled(false);
	2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip)
		namesrvAddr = 192.168.0.101:9876
		brokerIP1 = 192.168.0.101

4、DESC: service not available now, maybe disk full, CL:
	解决:修改启动脚本runbroker.sh,在里面增加一句话即可:
	JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
	(磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)

常见问题处理
	https://blog.csdn.net/sqzhao/article/details/54834761
	https://blog.csdn.net/mayifan0/article/details/67633729
	https://blog.csdn.net/a906423355/article/details/78192828

原文地址:https://www.cnblogs.com/jwen1994/p/12319048.html

时间: 2024-08-30 13:59:51

SpringBoot2.X 整合 RocketMQ4.X的相关文章

SpringBoot2.X整合elasticsearch&#39;

SpringBoot默认支持两种技术来和ES交互: 创建项目需要引入ES的启动器 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency> <!--集合工具包--><dependency> <gro

SpringBoot2.0应用(三):SpringBoot2.0整合RabbitMQ

如何整合RabbitMQ 1.添加spring-boot-starter-amqp <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> 2.添加配置 spring.rabbitmq.host=localhost spring.rabbitmq.po

springboot学习入门简易版五---springboot2.0整合jsp(11)

springboot对jsp支持不友好,内部tomcat对jsp不支持,需要使用外部tomcat,且必须打包为war包. 1 创建maven项目 注意:必须为war类型,否则找不到页面. 且不要把jsp页面存放在resources(原因:可能被别人访问,其次不在classes类路径中),因此,一般自行创建目录存放(一般/WEB-INF/下.  2 pom文件 <packaging>war</packaging> <!-- 注意为war包!!! --> <!-- s

SpringBoot2.0整合fastjson的正确姿势

SpringBoot2.0如何集成fastjson?在网上查了一堆资料,但是各文章的说法不一,有些还是错的,可能只是简单测试一下就认为ok了,最后有没生效都不知道.恰逢公司项目需要将JackSon换成fastjson,因此自己来实践一下SpringBoot2.0和fastjson的整合,同时记录下来方便自己后续查阅. 一.Maven依赖说明 SpringBoot的版本为: <version>2.1.4.RELEASE</version> 在pom文件中添加fastjson的依赖:

SpringBoot2.0 整合 SpringSecurity 框架,实现用户权限安全管理

本文源码:GitHub·点这里 || GitEE·点这里 一.Security简介 1.基础概念 Spring Security是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架.它提供了一组可以在Spring应用上下文中配置的Bean,充分利用了Spring的IOC,DI,AOP(面向切面编程)功能,为应用系统提供声明式的安全访问控制功能,减少了为安全控制编写大量重复代码的工作. 2.核心API解读 1).SecurityContextHolder 最基本的对

springboot2.0整合springsecurity前后端分离进行自定义权限控制

在阅读本文之前可以先看看springsecurity的基本执行流程,下面我展示一些核心配置文件,后面给出完整的整合代码到git上面,有兴趣的小伙伴可以下载进行研究 使用maven工程构建项目,首先需要引入最核心的依赖, <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-security</artifactId> <

Springboot2.x整合Redis以及连接哨兵模式/集群模式

依赖: <!--spirngboot版本为2.x--><!-- 加载spring boot redis包,springboot2.0中直接使用jedis或者lettuce配置连接池,默认为lettuce连接池,这里使用jedis连接池 --><!-- 加载spring boot redis包 --><dependency> <groupId>org.springframework.boot</groupId> <artifact

springboot2.0整合logback日志(详细)

一. 近期自己的项目想要一个记录日志的功能,而springboot本身就内置了日志功能,然而想要输入想要的日志,并且输出到磁盘,然后按天归档,或者日志的切分什么的,自带的日志仅仅具有简单的功能,百度了一番,总结如下,适合大多数的应用场景 二. springboot的pom文件都会引一个parent <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-star

springboot2.0整合es的异常总结

异常: availableProcessors is already set to [4], rejecting [4] 在启动类中加入 System.setProperty("es.set.netty.runtime.available.processors", "false"); 异常:org.elasticsearch.client.transport.NoNodeAvailableException: None of the configured nodes