SpringBoot如何优雅的使用RocketMQ

MQ,是一种跨进程的通信机制,用于上下游传递消息。在传统的互联网架构中通常使用MQ来对上下游来做解耦合。

举例:当A系统对B系统进行消息通讯,如A系统发布一条系统公告,B系统可以订阅该频道进行系统公告同步,整个过程中A系统并不关系B系统会不会同步,由订阅该频道的系统自行处理。

什么是RocketMQ?

官方说明:

随着使用越来越多的队列和虚拟主题,ActiveMQ IO模块遇到了瓶颈。我们尽力通过节流,断路器或降级来解决此问题,但效果不佳。因此,我们那时开始关注流行的消息传递解决方案Kafka。不幸的是,Kafka不能满足我们的要求,特别是在低延迟和高可靠性方面。

看到这里可以很清楚的知道RcoketMQ 是一款低延迟、高可靠、可伸缩、易于使用的消息中间件。

具有以下特性:

  • 支持发布/订阅(Pub/Sub)和点对点(P2P)消息模型
  • 能够保证严格的消息顺序,在一个队列中可靠的先进先出(FIFO)和严格的顺序传递
  • 提供丰富的消息拉取模式,支持拉(pull)和推(push)两种消息模式
  • 单一队列百万消息的堆积能力,亿级消息堆积能力
  • 支持多种消息协议,如 JMS、MQTT 等
  • 分布式高可用的部署架构,满足至少一次消息传递语义

RocketMQ环境安装

下载地址:https://rocketmq.apache.org/dowloading/releases/

从官方下载二进制或者源码来进行使用。源码编译需要Maven3.2x,JDK8

在根目录进行打包:

mvn -Prelease-all -DskipTests clean packager -U

distribution/target/apache-rocketmq文件夹中会存在一个文件夹版,zip,tar三个可运行的完整程序。

使用rocketmq-4.6.0.zip:

  1. 启动名称服务 mqnamesrv.cmd
  2. 启动数据中心 mqbroker.cmd -n localhost:9876

SpringBoot环境中使用RocketMQ

SpringBoot 入门:https://www.cnblogs.com/SimpleWu/p/10027237.html
SpringBoot 常用start:https://www.cnblogs.com/SimpleWu/p/9798146.html
当前环境版本为:

  • SpringBoot 2.0.6.RELEASE
  • SpringCloud Finchley.RELEASE
  • SpringCldod Alibaba 0.2.1.RELEASE
  • RocketMQ 4.3.0
    在项目工程中导入:
    <!-- MQ Begin -->
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>${rocketmq.version}</version>
    </dependency>
    <!-- MQ End -->

    由于我们这边已经有工程了所以就不在进行创建这种过程了。主要是看看如何使用RocketMQ。
    创建RocketMQProperties配置属性类,类中内容如下:

    @ConfigurationProperties(prefix = "rocketmq")
    public class RocketMQProperties {
    private boolean isEnable = false;
    private String namesrvAddr = "localhost:9876";
    private String groupName = "default";
    private int producerMaxMessageSize = 1024;
    private int producerSendMsgTimeout = 2000;
    private int producerRetryTimesWhenSendFailed = 2;
    private int consumerConsumeThreadMin = 5;
    private int consumerConsumeThreadMax = 30;
    private int consumerConsumeMessageBatchMaxSize = 1;
    //省略get set
    }

    现在我们所有子系统中的生产者,消费者对应:
    isEnable 是否开启mq
    namesrvAddr 集群地址
    groupName 分组名称
    设置为统一已方便系统对接,如有其它需求在进行扩展,类中我们已经给了默认值也可以在配置文件或配置中心中获取配置,配置如下:

    #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用[email protected](pid代表jvm名字)作为唯一标示
    rocketmq.groupName=please_rename_unique_group_name
    #是否开启自动配置
    rocketmq.isEnable=true
    #mq的nameserver地址
    rocketmq.namesrvAddr=127.0.0.1:9876
    #消息最大长度 默认1024*4(4M)
    rocketmq.producer.maxMessageSize=4096
    #发送消息超时时间,默认3000
    rocketmq.producer.sendMsgTimeout=3000
    #发送消息失败重试次数,默认2
    rocketmq.producer.retryTimesWhenSendFailed=2
    #消费者线程数量
    rocketmq.consumer.consumeThreadMin=5
    rocketmq.consumer.consumeThreadMax=32
    #设置一次消费消息的条数,默认为1条
    rocketmq.consumer.consumeMessageBatchMaxSize=1

创建消费者接口 RocketConsumer.java 该接口用户约束消费者需要的核心步骤:

/**
 * 消费者接口
 *
 * @author SimpleWu
 *
 */
public interface RocketConsumer {

/**
     * 初始化消费者
     */
    public abstract void init();

    /**
     * 注册监听
     *
     * @param messageListener
     */
    public void registerMessageListener(MessageListener messageListener);

}

创建抽象消费者 AbstractRocketConsumer.java:

/**
 * 消费者基本信息
 *
 * @author SimpelWu
 */
public abstract class AbstractRocketConsumer implements RocketConsumer {

    protected String topics;
    protected String tags;
    protected MessageListener messageListener;
    protected String consumerTitel;
    protected MQPushConsumer mqPushConsumer;

    /**
     * 必要的信息
     *
     * @param topics
     * @param tags
     * @param consumerTitel
     */
    public void necessary(String topics, String tags, String consumerTitel) {
        this.topics = topics;
        this.tags = tags;
        this.consumerTitel = consumerTitel;
    }

    public abstract void init();

    @Override
    public void registerMessageListener(MessageListener messageListener) {
        this.messageListener = messageListener;
    }

}

在类中我们必须指定这个topics,tags与消息监听逻辑
public abstract void init();该方法是用于初始化消费者,由子类实现。
接下来我们编写自动配置类RocketMQConfiguation.java,该类用户初始化一个默认的生产者连接,以及加载所有的消费者。
@EnableConfigurationProperties({ RocketMQProperties.class }) 使用该配置文件
@Configuration 标注为配置类
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true") 只有当配置中指定rocketmq.isEnable = true的时候才会生效
核心内容如下:

/**
 * mq配置
 *
 * @author SimpleWu
 */
@Configuration
@EnableConfigurationProperties({ RocketMQProperties.class })
@ConditionalOnProperty(prefix = "rocketmq", value = "isEnable", havingValue = "true")
public class RocketMQConfiguation {

    private RocketMQProperties properties;

    private ApplicationContext applicationContext;

    private Logger log = LoggerFactory.getLogger(RocketMQConfiguation.class);

    public RocketMQConfiguation(RocketMQProperties properties, ApplicationContext applicationContext) {
        this.properties = properties;
        this.applicationContext = applicationContext;
    }

    /**
     * 注入一个默认的消费者
     * @return
     * @throws MQClientException
     */
    @Bean
    public DefaultMQProducer getRocketMQProducer() throws MQClientException {
        if (StringUtils.isEmpty(properties.getGroupName())) {
            throw new MQClientException(-1, "groupName is blank");
        }

        if (StringUtils.isEmpty(properties.getNamesrvAddr())) {
            throw new MQClientException(-1, "nameServerAddr is blank");
        }
        DefaultMQProducer producer;
        producer = new DefaultMQProducer(properties.getGroupName());

        producer.setNamesrvAddr(properties.getNamesrvAddr());
        // producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

        // 如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName
        // producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(properties.getProducerMaxMessageSize());
        producer.setSendMsgTimeout(properties.getProducerSendMsgTimeout());
        // 如果发送消息失败,设置重试次数,默认为2次
        producer.setRetryTimesWhenSendFailed(properties.getProducerRetryTimesWhenSendFailed());

        try {
            producer.start();
            log.info("producer is start ! groupName:{},namesrvAddr:{}", properties.getGroupName(),
                    properties.getNamesrvAddr());
        } catch (MQClientException e) {
            log.error(String.format("producer is error {}", e.getMessage(), e));
            throw e;
        }
        return producer;

    }

    /**
     * SpringBoot启动时加载所有消费者
     */
    @PostConstruct
    public void initConsumer() {
        Map<String, AbstractRocketConsumer> consumers = applicationContext.getBeansOfType(AbstractRocketConsumer.class);
        if (consumers == null || consumers.size() == 0) {
            log.info("init rocket consumer 0");
        }
        Iterator<String> beans = consumers.keySet().iterator();
        while (beans.hasNext()) {
            String beanName = (String) beans.next();
            AbstractRocketConsumer consumer = consumers.get(beanName);
            consumer.init();
            createConsumer(consumer);
            log.info("init success consumer title {} , toips {} , tags {}", consumer.consumerTitel, consumer.tags,
                    consumer.topics);
        }
    }

    /**
     * 通过消费者信心创建消费者
     *
     * @param consumerPojo
     */
    public void createConsumer(AbstractRocketConsumer arc) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(this.properties.getGroupName());
        consumer.setNamesrvAddr(this.properties.getNamesrvAddr());
        consumer.setConsumeThreadMin(this.properties.getConsumerConsumeThreadMin());
        consumer.setConsumeThreadMax(this.properties.getConsumerConsumeThreadMax());
        consumer.registerMessageListener(arc.messageListenerConcurrently);
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        /**
         * 设置消费模型,集群还是广播,默认为集群
         */
        // consumer.setMessageModel(MessageModel.CLUSTERING);

        /**
         * 设置一次消费消息的条数,默认为1条
         */
        consumer.setConsumeMessageBatchMaxSize(this.properties.getConsumerConsumeMessageBatchMaxSize());
        try {
            consumer.subscribe(arc.topics, arc.tags);
            consumer.start();
            arc.mqPushConsumer=consumer;
        } catch (MQClientException e) {
            log.error("info consumer title {}", arc.consumerTitel, e);
        }

    }

}

然后在src/main/resources文件夹中创建目录与文件META-INF/spring.factories里面添加自动配置类即可开启启动配置,我们只需要导入依赖即可:

org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.xcloud.config.rocketmq.RocketMQConfiguation

接下来在服务中导入依赖,然后通过我们的抽象类获取所有必要信息对消费者进行创建,该步骤会在所有消费者初始化完成后进行,且只会管理是Spring Bean的消费者。br/>下面我们看看如何创建一个消费者,创建消费者的步骤非常简单,只需要继承AbstractRocketConsumer然后再加上Spring的@Component就能够完成消费者的创建,我们可以在类中自定义消费的主题与标签。
在项目可以根据需求当消费者创建失败的时候是否继续启动工程。
创建一个默认的消费者 DefaultConsumerMQ.java

@Component
public class DefaultConsumerMQ extends AbstractRocketConsumer {
    /**
     * 初始化消费者
     */
    @Override
    public void init() {
        // 设置主题,标签与消费者标题
        super.necessary("TopicTest", "*", "这是标题");
        //消费者具体执行逻辑
        registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                msgs.forEach(msg -> {
                    System.out.printf("consumer message boyd %s %n", new String(msg.getBody()));
                });
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
    }
}

super.necessary("TopicTest", "*", "这是标题"); 是必须要设置的,代表该消费者监听TopicTest主题下所有tags,标题那个字段是我自己定义的,所以对于该配置来说没什么意义。
我们可以在这里注入Spring的Bean来进行任意逻辑处理。
创建一个消息发送类进行测试

@Override
public String qmtest(@PathVariable("name")String name) throws MQClientException, RemotingException, MQBrokerException, InterruptedException, UnsupportedEncodingException {
    Message msg = new Message("TopicTest", "tags1", name.getBytes(RemotingHelper.DEFAULT_CHARSET));
    // 发送消息到一个Broker
    SendResult sendResult = defaultMQProducer.send(msg);
    // 通过sendResult返回消息是否成功送达
    System.out.printf("%s%n", sendResult);
    return null;
}

我们来通过Http请求测试:

http://localhost:10001/demo/base/mq/hello  consumer message boyd hello
http://localhost:10001/demo/base/mq/嘿嘿嘿嘿嘿  consumer message boyd 嘿嘿嘿嘿嘿 

好了到这里简单的start算是设计完成了,后面还有一些:顺序消息生产,顺序消费消息,异步消息生产等一系列功能,官人可参照官方去自行处理。

  • ActiveMQ 没经过大规模吞吐量场景的验证,社区不高不活跃。
  • RabbitMQ 集群动态扩展麻烦,且与当前程序语言不至于难以定制化。
  • kafka 支持主要的MQ功能,功能无法达到程序需求的要求,所以不使用,且与当前程序语言不至于难以定制化。
  • rocketMQ 经过全世界的女人的洗礼,已经很强大;MQ功能较为完善,还是分布式的,扩展性好;支持复杂MQ业务场景。(业务复杂可做首选)

SpringBoot如何优雅的使用RocketMQ

原文地址:https://blog.51cto.com/14230003/2463213

时间: 2024-08-01 14:45:34

SpringBoot如何优雅的使用RocketMQ的相关文章

SpringBoot:如何优雅地处理全局异常?

SpringBoot:如何优雅地处理全局异常? 之前用springboot的时候,只知道捕获异常使用try{}catch,一个接口一个try{}catch,这也是大多数开发人员异常处理的常用方式,虽然屡试不爽,但会造成一个问题,就是一个Controller下面,满屏幕的try{}catch,看着一点都不优雅,一点都不符合×××的气质,憋了这么久,×××今天终于决定对所有异常实施统一处理的方案. 开发准备 JDK8.正常的springboot项目 代码编写 通用异常处理 其实Spring系列的项目

springboot mybatis 优雅的添加多数据源

springboot的原则是简化配置,本文试图不通过xml配置,使用configuration配置数据源,并进行简单的数据访问. 并且配置了多数据源,在开发过程中这种场景很容易遇到. 1.依赖 springboot的starter mybatis的springboot集成包 jdbc <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId&g

Springboot如何优雅的解决ajax+自定义headers的跨域请求[转]

1.什么是跨域 由于浏览器同源策略(同源策略,它是由Netscape提出的一个著名的安全策略.现在所有支持JavaScript 的浏览器都会使用这个策略.所谓同源是指,域名,协议,端口相同.),凡是发送请求url的协议.域名.端口三者之间任意一与当前页面地址不同即为跨域. 具体可以查看下表: 2.springboot如何解决跨域问题 1.普通跨域请求解决方案: ①请求接口添加注解@CrossOrigin(origins = "http://127.0.0.1:8020", maxAge

六、springboot 简单优雅是实现短信服务

前言 上一篇讲了 springboot 集成邮件服务,接下来让我们一起学习下springboot项目中怎么使用短信服务吧. 项目中的短信服务基本上上都会用到,简单的注册验证码,消息通知等等都会用到.所以我这个脚手架也打算将短息服务继承进来. 短息服务我使用的平台是阿里云的.网上有很多的短信服务提供商.大家可以根据自己的需求进行选择. 准备工作 在阿里云上开通服务,以及进行配置.这些阿里云官方文档都写的很清楚,怎么做就不细说的,大家可以参考一下这篇文章: https://blog.csdn.net

Springboot 优雅停止服务的几种方法

在使用Springboot的时候,都要涉及到服务的停止和启动,当我们停止服务的时候,很多时候大家都是kill -9 直接把程序进程杀掉,这样程序不会执行优雅的关闭.而且一些没有执行完的程序就会直接退出. 我们很多时候都需要安全的将服务停止,也就是把没有处理完的工作继续处理完成.比如停止一些依赖的服务,输出一些日志,发一些信号给其他的应用系统,这个在保证系统的高可用是非常有必要的.那么咱么就来看一下几种停止springboot的方法. 第一种就是Springboot提供的actuator的功能,它

一文带你深入了解 Redis 内存模型

作者:编程迷思 链接:https://www.cnblogs.com/kismetv/p/8654978.html 前言 Redis是目前最火爆的内存数据库之一,通过在内存中读写数据,大大提高了读写速度,可以说Redis是实现网站高并发不可或缺的一部分. 我们使用Redis时,会接触Redis的5种对象类型(字符串.哈希.列表.集合.有序集合),丰富的类型是Redis相对于Memcached等的一大优势.在了解Redis的5种对象类型的用法和特点的基础上,进一步了解Redis的内存模型,对Red

Java从0到全栈-Java语言概述与开发环境搭建

Java从0到全栈-Java语言概述与开发环境搭建 Java从0到全栈 Java语言概述 Java发展历史 Java之父-James Golsing 起源 1991年,SUN(Standford University Network)公司的James Golsing领导的工程师小组想要开发一种用于像电视机.微波炉.电话这样的消费类电子产品的小型计算机语言,该产品的特点是由于不同的厂商选择不同的CPU和操作系统,因此要求该语言不能和特定的体系结构绑定在一起,也就是跨平台的.最初将这个语言命名为Oa

(转) SpringBoot非官方教程 | 第十一篇:springboot集成swagger2,构建优雅的Restful API

swagger,中文"拽"的意思.它是一个功能强大的api框架,它的集成非常简单,不仅提供了在线文档的查阅,而且还提供了在线文档的测试.另外swagger很容易构建restful风格的api,简单优雅帅气,正如它的名字. 一.引入依赖 <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <vers

14套java精品高级架构课,Dubbo分布式Restful 服务,并发原理编程,SpringBoot,SpringCloud,RocketMQ中间件视频教程

14套java精品高级架构课,缓存架构,深入Jvm虚拟机,全文检索Elasticsearch,Dubbo分布式Restful 服务,并发原理编程,SpringBoot,SpringCloud,RocketMQ中间件,Mysql分布式集群,服务架构,运 维架构视频教程 14套精品课程介绍: 1.14套精 品是最新整理的课程,都是当下最火的技术,最火的课程,也是全网课程的精品: 2.14套资 源包含:全套完整高清视频.完整源码.配套文档: 3.知识也 是需要投资的,有投入才会有产出(保证投入产出比是