发送消息RocketMqProducerService
package com.jane.rocketmq.service;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;
@Service
public class RocketMqProducerService {
private static OrderProducer producer;
/**
* 启动消息发送者,仅启动一次
*/
public static void startProducer(){
producer = RocketMqProducerSingleton.getInstance();
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可。
producer.start();
}
/**
* 发送消息
* @param topic
* @param tag
* @param message
* @param sharding
*/
public static void sendMqMessage( String topic, String tag, String message, String sharding ) {
String key = UUID.randomUUID().toString();
Message msg = new Message(
// Message 所属的 Topic
topic,
// Message Tag, 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 的服务器过滤
tag,
// Message Body 可以是任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
message.getBytes()
);
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发
msg.setKey(key);
// 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。
// 全局顺序消息,该字段可以设置为任意非空字符串。
String shardingKey = sharding;
try {
SendResult sendResult = producer.send(msg, shardingKey);
// 发送消息,只要不抛异常就是成功
if (sendResult != null) {
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(dateFormat.format(new Date()) + "-发送消息成功-sharding:" + shardingKey + ",tag:" + tag + ",key:"+ key + ",msgId:" + sendResult.getMessageId());
}
}
catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
System.out.println(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
throw e;
}
}
}
消费者 RocketMqConsumerService
package com.jane.rocketmq.service;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
public class RocketMqConsumerService {
private static OrderConsumer consumer;
public void consumerMqMessage() {
String topic = "topic-test";
String tags = "*";
consumer = RocketMqConsumerSingleton.getInstance();
// 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
consumer.subscribe(
// Message 所属的 Topic
topic,
// 订阅指定 Topic 下的 Tags:
// 1. * 表示订阅所有消息
// 2. TagA || TagB || TagC 表示订阅 TagA 或 TagB 或 TagC 的消息
tags,
new MessageOrderListener() {
/**
* 1. 消息消费处理失败或者处理出现异常,返回 OrderAction.Suspend<br>
* 2. 消息处理成功,返回 OrderAction.Success
*/
@Override
public OrderAction consume(Message message, ConsumeOrderContext context) {
//System.out.println("消费者:" + consumer);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(dateFormat.format(new Date()) + "-消费消息---sharding:" + message.getShardingKey() + ",tag:" + message.getTag() + ",key: " + message.getKey() + ",MsgId:" + message.getMsgID());
try {
Thread.sleep(2000);
System.out.println("-------------消费者睡2秒后----------");
} catch (InterruptedException e) {
e.printStackTrace();
}
return OrderAction.Success;
}
});
consumer.start();
}
}
RocketMQ配置
这个可以配置在配置中心
package com.jane.rocketmq.config;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import java.util.Properties;
public class RocketMqConfig {
public static Properties getProducerProperties() {
//todo 这里从阿里云的配置里获取
Properties properties = new Properties();
// 您在控制台创建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "");
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, "");
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, "");
// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
// 推荐接入点设置方式
properties.put(PropertyKeyConst.NAMESRV_ADDR, "");
return properties;
}
public static Properties getConsumerProperties() {
//todo 这里从阿里云的配置里获取
Properties properties = new Properties();
// 您在控制台创建的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "");
// AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.AccessKey, "");
// SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
properties.put(PropertyKeyConst.SecretKey, "");
// 设置 TCP 接入域名,进入控制台的实例管理页面的“获取接入点信息”区域查看
// 推荐接入点设置方式
properties.put(PropertyKeyConst.NAMESRV_ADDR, "");
// 顺序消息消费失败进行重试前的等待时间,单位(毫秒),取值范围: 10 毫秒 ~ 1800 毫秒
properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
// 消息消费失败时的最大重试次数
properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
return properties;
}
}
启动监听器RocketMqStartListener
package com.jane.rocketmq.listener;
import com.jane.rocketmq.service.RocketMqConsumerService;
import com.jane.rocketmq.service.RocketMqProducerService;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.ConfigurableApplicationContext;
/**
* 增加消费者启动监听器,springBoot启动之后自动启动消费者,同时Start Producer
*/
public class RocketMqStartListener implements ApplicationListener<ApplicationReadyEvent> {
private RocketMqConsumerService rocketMqConsumerService;
private RocketMqProducerService rocketMqProducerService;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
System.out.println("执行监听器开始-RocketMqStartListener");
ConfigurableApplicationContext applicationContext = applicationReadyEvent.getApplicationContext();
rocketMqConsumerService = applicationContext.getBean(RocketMqConsumerService.class);
System.out.println("开始启动消费者");
rocketMqConsumerService.consumerMqMessage();
System.out.println("开始启动生产者的start方法");
rocketMqProducerService = applicationContext.getBean(RocketMqProducerService.class);
rocketMqProducerService.startProducer();
}
}
启动文件
package com.jane.rocketmq;
import com.jane.rocketmq.listener.RocketMqCloseListener;
import com.jane.rocketmq.listener.RocketMqStartListener;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;
@SpringBootApplication
@ComponentScan(basePackages = "com.jane")
public class RocketMqApplication {
public static void main(String[] args) {
SpringApplication sa = new SpringApplication(RocketMqApplication.class);
sa.addListeners(new RocketMqStartListener());
sa.addListeners(new RocketMqCloseListener());
sa.run(args);
}
}
发送消息Controller
package com.jane.rocketmq.contraller;
import com.jane.rocketmq.service.RocketMqConsumerService;
import com.jane.rocketmq.service.RocketMqProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AppController {
@Autowired
RocketMqProducerService rocketMqProducerService;
@Autowired
RocketMqConsumerService rocketMqConsumerService;
@GetMapping("/sendMessage/{storeNo}")
public void sendMessage(@PathVariable("storeNo") String storeNo){
rocketMqProducerService.sendMqMessage("topic-test",storeNo,"tessssssssssssssst", storeNo);
}
@GetMapping("/getMessage1")
public void getMessage1(){
rocketMqConsumerService.consumerMqMessage();
}
@GetMapping("/getMessage2")
public void getMessage2(){
rocketMqConsumerService.consumerMqMessage();
}
}
启动&执行结果
可以启动多个,这样就实现了多生产-多消费的关系,实现顺序消费
原文地址:https://blog.51cto.com/janephp/2402922
时间: 2024-10-10 12:55:21