基于阿里云RocketMQ的分片顺序消费+监听器自动启动的Springboot实验

发送消息RocketMqProducerService

package com.jane.rocketmq.service;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

@Service
public class RocketMqProducerService {

    private static OrderProducer producer;

    /**
     * 启动消息发送者,仅启动一次
     */
    public static void startProducer(){
        producer = RocketMqProducerSingleton.getInstance();
        // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可。
        producer.start();
    }

    /**
     * 发送消息
     * @param topic
     * @param tag
     * @param message
     * @param sharding
     */
    public static void sendMqMessage( String topic, String tag, String message, String sharding ) {

        String key = UUID.randomUUID().toString();
        Message msg = new Message(
                // Message 所属的 Topic
                topic,
                // Message Tag, 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤
                tag,
                // Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
                message.getBytes()
        );
        // 设置代表消息的业务关键属性,请尽可能全局唯一。
        // 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
        // 注意:不设置也不会影响消息正常收发
        msg.setKey(key);
        // 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。
        // 全局顺序消息,该字段可以设置为任意非空字符串。
        String shardingKey = sharding;
        try {
            SendResult sendResult = producer.send(msg, shardingKey);
            // 发送消息,只要不抛异常就是成功
            if (sendResult != null) {
                SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                System.out.println(dateFormat.format(new Date()) + "-发送消息成功-sharding:" + shardingKey + ",tag:" + tag + ",key:"+ key + ",msgId:" + sendResult.getMessageId());
            }
        }
        catch (Exception e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
            throw e;
        }
    }
}

消费者 RocketMqConsumerService

package com.jane.rocketmq.service;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import org.springframework.stereotype.Service;

import java.text.SimpleDateFormat;
import java.util.Date;

@Service
public class RocketMqConsumerService {

    private static OrderConsumer consumer;

    public void consumerMqMessage() {
        String topic = "topic-test";
        String tags = "*";
        consumer = RocketMqConsumerSingleton.getInstance();
        // 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
        consumer.subscribe(
            // Message 所属的 Topic
            topic,
            // 订阅指定 Topic 下的 Tags:
            // 1. * 表示订阅所有消息
            // 2. TagA || TagB || TagC 表示订阅 TagA 或 TagB 或 TagC 的消息
            tags,
            new MessageOrderListener() {
                /**
                 * 1. 消息消费处理失败或者处理出现异常,返回 OrderAction.Suspend<br>
                 * 2. 消息处理成功,返回 OrderAction.Success
                 */
                @Override
                public OrderAction consume(Message message, ConsumeOrderContext context) {
                    //System.out.println("消费者:" + consumer);
                    SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    System.out.println(dateFormat.format(new Date()) + "-消费消息---sharding:" + message.getShardingKey() + ",tag:" + message.getTag() + ",key: " + message.getKey() + ",MsgId:" + message.getMsgID());
                    try {
                        Thread.sleep(2000);
                        System.out.println("-------------消费者睡2秒后----------");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return OrderAction.Success;
                }
            });
        consumer.start();
    }
}

RocketMQ配置

这个可以配置在配置中心

package com.jane.rocketmq.config;

import com.aliyun.openservices.ons.api.PropertyKeyConst;

import java.util.Properties;

public class RocketMqConfig {

    public static Properties getProducerProperties() {

        //todo 这里从阿里云的配置里获取

        Properties properties = new Properties();
        // 您在控制台创建的 Group ID
        properties.put(PropertyKeyConst.GROUP_ID, "");
        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.AccessKey, "");
        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.SecretKey, "");
        // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
        // 推荐接入点设置方式
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "");

        return properties;
    }

    public static Properties getConsumerProperties() {

        //todo 这里从阿里云的配置里获取

        Properties properties = new Properties();
        // 您在控制台创建的 Group ID
        properties.put(PropertyKeyConst.GROUP_ID, "");
        // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.AccessKey, "");
        // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
        properties.put(PropertyKeyConst.SecretKey, "");
        // 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
        // 推荐接入点设置方式
        properties.put(PropertyKeyConst.NAMESRV_ADDR, "");
        // 顺序消息消费失败进行重试前的等待时间,单位(毫秒),取值范围: 10 毫秒 ~ 1800 毫秒
        properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
        // 消息消费失败时的最大重试次数
        properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");

        return properties;
    }
}

启动监听器RocketMqStartListener

package com.jane.rocketmq.listener;

import com.jane.rocketmq.service.RocketMqConsumerService;
import com.jane.rocketmq.service.RocketMqProducerService;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;

/**
 * 增加消费者启动监听器,springBoot启动之后自动启动消费者,同时Start Producer
 */
public class RocketMqStartListener implements ApplicationListener<ApplicationReadyEvent> {

    private RocketMqConsumerService rocketMqConsumerService;
    private RocketMqProducerService rocketMqProducerService;

    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        System.out.println("执行监听器开始-RocketMqStartListener");
        ConfigurableApplicationContext applicationContext = applicationReadyEvent.getApplicationContext();
        rocketMqConsumerService = applicationContext.getBean(RocketMqConsumerService.class);
        System.out.println("开始启动消费者");
        rocketMqConsumerService.consumerMqMessage();
        System.out.println("开始启动生产者的start方法");
        rocketMqProducerService = applicationContext.getBean(RocketMqProducerService.class);
        rocketMqProducerService.startProducer();

    }
}

启动文件

package com.jane.rocketmq;

import com.jane.rocketmq.listener.RocketMqCloseListener;
import com.jane.rocketmq.listener.RocketMqStartListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackages = "com.jane")
public class RocketMqApplication {

    public static void main(String[] args) {
        SpringApplication sa = new SpringApplication(RocketMqApplication.class);
        sa.addListeners(new RocketMqStartListener());
        sa.addListeners(new RocketMqCloseListener());
        sa.run(args);
    }

}

发送消息Controller

package com.jane.rocketmq.contraller;

import com.jane.rocketmq.service.RocketMqConsumerService;
import com.jane.rocketmq.service.RocketMqProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class AppController {
    @Autowired
    RocketMqProducerService rocketMqProducerService;

    @Autowired
    RocketMqConsumerService rocketMqConsumerService;

    @GetMapping("/sendMessage/{storeNo}")
    public void sendMessage(@PathVariable("storeNo") String storeNo){

        rocketMqProducerService.sendMqMessage("topic-test",storeNo,"tessssssssssssssst", storeNo);
    }

    @GetMapping("/getMessage1")
    public void getMessage1(){
        rocketMqConsumerService.consumerMqMessage();
    }

    @GetMapping("/getMessage2")
    public void getMessage2(){
        rocketMqConsumerService.consumerMqMessage();
    }
}

启动&执行结果

可以启动多个,这样就实现了多生产-多消费的关系,实现顺序消费

原文地址:https://blog.51cto.com/janephp/2402922

时间: 2024-10-10 12:55:21

基于阿里云RocketMQ的分片顺序消费+监听器自动启动的Springboot实验的相关文章

阿里云RocketMQ的性能测试(一、本地测试)

因业务需要,需要进行阿里云RocketMQ的性能测试. 环境,一台windows系统CPU:I7,内存:8G,64位操作系统. 测试两种场景,为了保证订阅关系一致性(可以去阿里云官网了解订阅关系一致性),消费分为两种模式. 1.按Tag订阅,订阅所有Tag,测试使用3个消费者,3个生产者,每个生产者发送一万条数据,放到同一个Tag里,对应同一个ShardingKey,保证顺序消费. 测试结果: 2.按Tag订阅,每个消费者订阅一个Tag,每个Tag在一个分组里面. 测试结果: 其中要特别注意第二

云上拍客梨视频 基于阿里云的技术实践分享

摘要: 梨视频大部分的业务都选择了阿里云,其中一个主要原因是阿里云提供基于钉钉群构建的24贴身技术支持,刘隽表示,这种服务模式可以更充分.高效的对接需求,快速得到反馈,这也让梨视频的同学有信心去尝试一些新的方案. 在上海云栖大会视频专场中,梨视频CTO刘隽先生分享了梨视频拍客生产全流程及其背后的技术,同时作为业务使用方,向现场嘉宾阿里云产品的使用实践. 云上拍客梨视频 梨视频是全球第一资讯短视频内容生产和消费平台,拥有5万名全球核心拍客,遍布全球七大洲,覆盖525个国际主要城市和2000多个国内

基于阿里云Ubuntu14.04 64bit部署WordPress博客系统

环境:基于阿里云Ubuntu14.04  64bit服务器系统 1, 安装apache2+mysql5+php5+php5-mysql sudo apt-get install apache2 sudo apt-get install php5 sudo apt-get install mysql-server sudo apt-get install php5-mysql sudo /etc/init.d/apache2 restart 至此重启了apache后应该就已经配置好服务器了,对此先

基于阿里云ECS的phpwind网站备案前如何远程访问调试?

基于阿里云ECS的phpwind网站部署非常方便,但云主机的外网IP绑定域名却比较复杂.先要申请域名,成功后还需要备案.尤其是企业网站备案,需要提交的资料较多,准备资料以及审批的时间较长.这段时间在外网采用IP访问是不行的,显示的是主机宝服务器默认页. 即使域名已经绑定成功,但若未备案的话,采用域名访问也会出现以下“温馨提示”: 那么有没有办法在网站备案批准之前就能远程访问,以便尽早调试?经过摸索,基于阿里云ECS的phpwind网站可以采用以下步骤,在网站域名备案前修改默认的80端口以便使用I

在基于阿里云服务器CentOS6.5下安装Subversion 1.6.5服务

最近阿里云搞了个1元免费提供云服务器的活动,偶心痒痒就申请了一个. 正好可以作为团队的SVN服务器了,下面就来部署SVN服务吧. 一.安装基础环境 apr-1.5.0.tar.gz apr-util-1.5.3.tar.gz pcre-8.35.tar.gz zlib-1.2.8.tar.gz subversion-1.5.6.tar.gz tar -xzvf apr-1.5.0.tar.gz cd apr-1.5.0 ./configure --prefix=/usr/local/apr ma

基于阿里云数加MaxCompute的企业大数据仓库架构建设思路

摘要: 数加大数据直播系列课程主要以基于阿里云数加MaxCompute的企业大数据仓库架构建设思路为主题分享阿里巴巴的大数据是怎么演变以及怎样利用大数据技术构建企业级大数据平台. 本次分享嘉宾是来自阿里云大数据的技术专家祎休 背景与总体思路 数据仓库是一个面向主题的.集成的.非易失的.反映历史变化的数据集合用于支持管理决策. 原文链接:http://click.aliyun.com/m/43803/ 数加大数据直播系列课程,主要以基于阿里云数加MaxCompute的企业大数据仓库架构建设思路为主

(二)基于阿里云的MQTT远程控制(购买阿里云,在云端安装MQTT,测试MQTT远程通信)

QQ名称为Friday~的网友把他自己买MQTT的过程截图发给了我,今天就说一下如何购买阿里云,安装MQTT可以参考 http://www.cnblogs.com/yangfengwu/p/7764667.html https://blog.csdn.net/frankcheng5143/article/details/52045501 如果有什么问题可以在上面的群里面向他请教 基于阿里云的远程控制,其实就是在云端安装服务器,然后我们的WiFi设备和手机都去连接云端服务器(云端服务器就相当于云端

如何基于阿里云搭建适合初创企业的轻量级架构?

摘要: 在项目的初期往往存在很多变数,业务逻辑时刻在变,而且还要保证快速及时,所以,一个灵活多变.快速部署.持续集成并可以适应多种情况的架构便显得尤为重要.本文主要介绍基于阿里云搭建适合项目初期的后端架构 ----基于阿里云搭建的适合初创企业的轻量级架构 前言在项目的初期往往存在很多变数,业务逻辑时刻在变,而且还要保证快速及时,所以,一个灵活多变.快速部署.持续集成并可以适应多种情况的架构便显得尤为重要.本文主要介绍基于阿里云搭建适合项目初期的后端架构,至于细节操作不作描述,比如nginx配置优

阿里云短信验证_基于阿里云OpenAPI实现

阿里云短信服务 背景简介: 短信验证以及短信通知,目前已经应用的非常广泛,最近因项目需要,需要将原来的短信接口换成阿里云的的短信服务,原项目集成的短信服务能够实现短信的发送以及短信的验证整个过程,简单的来说,原来的短息服务,只需应用申请获取短信,短息服务器会发送短信到指定的手机,用户将验证码发送到短信服务商的服务器,服务器做出验证返回是否通过,而阿里云仅提供短信发送服务,需要自己开发短信的验证.下面简单的介绍一下: 1.获取阿里云AccessKey 用户->Accesskeys:需要自己创建一个