RocketMQ-广播模式消费

Rocketmq 消费者默认是集群的方式消费的,消费者还可以用广播的模式进行消费。广播模式消费就是所有订阅同一个主题的消费者都会收到消息。代码实现上其实很简单,就是在消费端添加

consumer.setMessageModel(MessageModel.BROADCASTING);

就可以了。我们看实验步骤:

一、启动ConsumerBroadCastMember1

二、启动ConsumerBroadCastMember2

三、运行ProducerBraodCast

四、我们可以看到两个Consumer都收到了同样的消息。

Producer端:

package org.hope.lee.producer;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendCallback;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageQueue;
import com.alibaba.rocketmq.remoting.exception.RemotingException;

public class ProducerBroadCast {
    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("push_consumer");
        producer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        try {
            // 设置实例名称
            producer.setInstanceName("producer_broadcast");
            // 设置重试次数
            producer.setRetryTimesWhenSendFailed(3);
            // 开启生产者
            producer.start();
            // 创建一条消息
            Message msg = new Message("topic_broadcast", "TagA", "OrderID0034", "message_broadcast_test".getBytes());
            SendResult send = producer.send(msg);
            System.out.println("id:--->" + send.getMsgId() + ",result:--->" + send.getSendStatus());

        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

Consumer端:

package org.hope.lee.consumer;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class ConsumerBroadCastMember1 {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
        consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        // 批量消费,每次拉取10条
        consumer.setConsumeMessageBatchMaxSize(10);
        //设置广播消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //设置集群消费
//        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 如果非第一次启动,那么按照上次消费的位置继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅PushTopic下Tag为push的消息
        consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
        consumer.registerMessageListener(new MqBroadCastListener());
        consumer.start();
        System.out.println("Consumer1 Started.");

    }
}
class MqBroadCastListener implements MessageListenerConcurrently{
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        try {
            MessageExt msg = msgs.get(0);
            String msgBody = new String(msg.getBody(), "utf-8");
            System.out.println("msgBody:" + msgBody);
        } catch(Exception e) {
            e.printStackTrace();
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}
package org.hope.lee.consumer;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class ConsumerBroadCastMember2 {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_broadcast");
        consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
        // 批量消费,每次拉取10条
        consumer.setConsumeMessageBatchMaxSize(10);
        //设置广播消费
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //设置集群消费
//        consumer.setMessageModel(MessageModel.CLUSTERING);
        // 如果非第一次启动,那么按照上次消费的位置继续消费
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        // 订阅PushTopic下Tag为push的消息
        consumer.subscribe("topic_broadcast", "TagA || Tag B || Tage C");
        consumer.registerMessageListener(new MqBroadCastListener());
        consumer.start();
        System.out.println("Consumer2 Started.");

    }
}

结果:

https://gitee.com/huayicompany/RocketMQ-learn/tree/master/rocketmq-api

原文地址:https://www.cnblogs.com/happyflyingpig/p/8215552.html

时间: 2024-08-30 07:41:34

RocketMQ-广播模式消费的相关文章

rocketmq广播消息

发布与模式实现.广播就是向一个主题的所有订阅者发送同一条消息. 在发送消息的时候和普通的消息并与不同之处,只是在消费端做一些配置即可. Consumer消息消费 public class BroadcastConsumer { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_ren

uip UDP 服务器广播模式(客户端可以任意端口,并且主动向客户端发送数据)

目前移植uip,发现UDP 服务器模式下,必须指定本地端口以及客户端端口,否则只能讲客户端端口设置为0,才能接收任意端口的数据,但是无法发送数据,因为此时客户端端口设置为0了,我通过将原始数据包中的客户端端口保存下来,并且在发送的时候将客户端端口替换为指定的端口,发送完成之后又设置为0,这样就实现了向任意客户端端口发送数据. uip.c if(uip_udp_conn->lport != 0 && UDPBUF->destport == uip_udp_conn->lpo

Python-RabbitMQ direct广播模式

fanout广播模式是全部都能收到信息,那我要是想要有条件选择的接收呢,需要用到direct模式 这张图的大概意思是Exchange的类型为direct,发的error级别的消息投递到第一个队列,消息级别为info.error.warning级别的消息投递到第二个队列.先定义一个生产者 再定义消费者 进行测试打开级别级别为info.warning.error三个级别的消费者 在生产者端发送一个级别为error的消息 观察三个级别的消费者,最终只能级别为error的消费者能收到下消息 原文地址:h

RabbitMQ广播模式

广播模式:1对多,produce发送一则消息多个consumer同时收到.注意:广播是实时的,produce只负责发出去,不会管对端是否收到,若发送的时刻没有对端接收,那消息就没了,因此在广播模式下设置消息持久化是无效的. 三种广播模式: fanout: 所有bind到此exchange的queue都可以接收消息(纯广播,绑定到RabbitMQ的接受者都能收到消息):direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息:topic:所有符合routin

RocketMQ的顺序消费和事务消费

一.三种消费 :1.普通消费 2. 顺序消费 3.事务消费 1.1  顺序消费:在网购的时候,我们需要下单,那么下单需要假如有三个顺序,第一.创建订单 ,第二:订单付款,第三:订单完成.也就是这个三个环节要有顺序,这个订单才有意义.RocketMQ可以保证顺序消费,他的实现是生产者(一个生产者可以对多个主题去发送消息)将这个三个消息放在topic(一个topic默认有4个队列)的一个队列里面,单机支持上万个持久化队列,消费端去消费的时候也是只能有一个Consumer去取得这个队列里面的数据,然后

JAVA代码之RocketMQ生产和消费数据

一.启动RocketMQ [[email protected] ~]# cat /etc/hosts # Do not remove the following line, or various programs # that require network functionality will fail. 127.0.0.1               localhost.localdomain localhost ::1             localhost6.localdomain6

uip UDP server广播模式(client能够随意port,而且主动向client发送数据)

眼下移植uip,发现UDP server模式下,必须指定本地port以及clientport,否则仅仅能讲clientport设置为0,才干接收随意port的数据,可是无法发送数据,由于此时clientport设置为0了,我通过将原始数据包中的clientport保存下来,而且在发送的时候将clientport替换为指定的port,发送完毕之后又设置为0,这样就实现了向随意clientport发送数据. uip.c if(uip_udp_conn->lport != 0 && UDP

RocketMQ部分数据消费不了问题排查

问题现象 今天忽然收到RocketMQ预警信息如下: 提醒有部分数据没有消费,产生堆积情况. 打开RocketMq-Console-Ng查看如下图形式: 备注:第一反应是Consumer Group内订阅了多个topic?(为什么这么怀疑,下次分析). 通过命令statsAll 作用是查询Topic and Consumer tps stats: sh mqadmin statsAll -n namesrv 发现没有问题,很奇怪?还好之前源码看过,只能调试源码了. 源码调试 本篇不重点讲解源码过

程序重启RocketMQ消息重复消费

最近在调试RocketMQ消息发送与消费的Demo时,发现一个问题:只要重启程序,RocketMQ消息就会重复消费. 那么这是什么原因导致的,又该如何解决呢? 经过一番排查,发现程序使用的RocketMQ客户端版本是3.6.2,而测试环境安装的RocketMQ环境的版本是4.1.0.原来是客户端和服务器端版本不一样导致的,消息并没有最终被消费,即没有ACK消息确认,只要程序重启就会重复消费. 解决方案:RocketMQ客户端版本使用与服务器端的同一版本,即4.1.0版本. 划重点:使用Rocke