有以下3种方式发送RocketMQ消息
- 可靠同步发送 reliable synchronous
- 可靠异步发送 reliable asynchronous
- 单向发送 one-way transmission
可靠同步发送
主要运用在比较重要一点消息传递/通知等业务
public class SyncProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("test"); producer.start(); for (int i = 0; i < 100; i++) { Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
可靠异步发送
通常用于对发送消息响应时间要求更高/更快的场景
public class AsyncProducer { public static void main( String[] args) throws MQClientException, InterruptedException, UnsupportedEncodingException { DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test"); producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 10000000; i++) { try { final int index = i; Message msg = new Message("Jodie_topic_1023", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); //重点在这里 异步发送回调 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } @Override public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } catch (Exception e) { e.printStackTrace(); } } producer.shutdown(); } }
单向发送
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
只发送消息,不等待服务器响应,只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。
public class OnewayProducer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer = new DefaultMQProducer("Test"); producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
原文地址:https://www.cnblogs.com/peachyy/p/9406536.html
时间: 2024-11-05 17:29:36