rocketmq消息重复推送的问题

最近,在公司的测试环境,遇到个问题,每次重启应用重启后,原来消费过的消息又被重复推送了一遍,消费者和生产者代码如下:

package com.tf56.queue.client;

import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
/**
 * RocketMQ 生产者工具类
 * @author zjhua
 *
 */
public class RocketMQProducer {

    private DefaultMQProducer producer;
    private String namesrvAddr;
    private String groupName;
    private String instanceName;

    public String getNamesrvAddr() {
        return namesrvAddr;
    }

    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }

    public String getGroupName() {
        return groupName;
    }

    public void setGroupName(String groupName) {
        this.groupName = groupName;
    }

    public String getInstanceName() {
        return instanceName;
    }

    public void setInstanceName(String instanceName) {
        this.instanceName = instanceName;
    }

    public RocketMQProducer(String namesrvAddr, String groupName,
            String instanceName) {
        super();
        this.namesrvAddr = namesrvAddr;
        this.groupName = groupName;
        this.instanceName = instanceName;
        this.producer = new DefaultMQProducer(groupName);

        producer.setNamesrvAddr(namesrvAddr);
        producer.setInstanceName(instanceName);
        producer.setVipChannelEnabled(false);

        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发送消息工具类
     * @param topic
     * @param tags
     * @param keys
     * @param body
     * @return
     * @throws MQClientException
     * @throws RemotingException
     * @throws MQBrokerException
     * @throws InterruptedException
     */
    public SendResult send(String topic, String tags, String keys, byte[] body)
            throws MQClientException, RemotingException, MQBrokerException,
            InterruptedException {
        Message msg = new Message(topic, tags, keys, body);
        try {
            SendResult sendResult = this.producer.send(msg);
            return sendResult;
        } catch (MQClientException | RemotingException | MQBrokerException
                | InterruptedException e) {
            e.printStackTrace();
            throw e;
        }
    }

    /**
     * 发送工具类
     * @param topic
     * @param tags
     * @param keys
     * @param body
     * @param retryTimes
     * @param elapseMS
     * @return
     */
    public SendResult send(String topic, String tags, String keys, byte[] body,int retryTimes,int elapseMS) {
        Message msg = new Message(topic, tags, keys, body);
        boolean success = false;
        int i = 0;
        SendResult sendResult = null;
        while (!success && i++ < retryTimes) {
            try {
                sendResult = this.producer.send(msg);
                return sendResult;
            } catch (MQClientException | RemotingException | MQBrokerException
                    | InterruptedException e) {
                e.printStackTrace();
            }
            try {
                Thread.sleep(elapseMS);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        return sendResult;
    }

    public static void main(String[] args) throws MQClientException,
            InterruptedException {
        RocketMQProducer producer = new RocketMQProducer("10.7.29.121:9876",
                "ProducerGroupName", "Producer");

        for (int i = 0; i < 10; i++) {
            try {
                {
                    SendResult sendResult = producer.send("TopicTestZJH",// topic
                            "TagA",// tag
                            "OrderID001",// key
                            ("Hello MetaQ" + i).getBytes());
                    System.out.println(sendResult);
                }

                {
                    SendResult sendResult = producer.send("TopicTestYIDU",// topic
                            "TagB",// tag
                            "OrderID0034",// key
                            ("Hello MetaQ" + i).getBytes());
                    System.out.println(sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            TimeUnit.MILLISECONDS.sleep(1000);
        }
        /**
         * spring bean配置
         */
//        <bean id="rocketMQProducer" class="com.tf56.queue.client.RocketMQProducer">
//            <constructor-arg name="namesvrAddr" value="10.7.29.121:9876"/>
//            <constructor-arg name="groupName" value="ProducerGroupName"/>
//            <constructor-arg name="instanceName" value="Producer"/>
//        </bean>
    }
}

消费端代码:

package tf56.sofa.util;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;

public class RocketMQPushConsumer {

    private DefaultMQPushConsumer consumer;
    private String namesrvAddr;
    private String groupName;
    private String instanceName;
    private String topics;

    public RocketMQPushConsumer(String namesrvAddr, String groupName, String instanceName,String topics,MessageListenerConcurrently messageListener) {
        super();
        this.namesrvAddr = namesrvAddr;
        this.groupName = groupName;
        this.instanceName = instanceName;
        this.topics = topics;

        /**
         * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
         * 注意:ConsumerGroupName需要由应用来保证唯一
         */
        consumer = new DefaultMQPushConsumer(groupName);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        consumer.setInstanceName(RocketMQPushConsumer.getInstanceName(namesrvAddr));
        consumer.setMessageModel(MessageModel.CLUSTERING);
        consumer.setVipChannelEnabled(false);
        try {
            /**
             * 订阅指定topic下所有消息<br>
             * 注意:一个consumer对象可以订阅多个topic
             */

            String[] topicsArr = topics.split(";");
            for(int i=0;i<topicsArr.length;i++) {
                consumer.subscribe(topicsArr[i], "*");
            }

            consumer.registerMessageListener(messageListener);
        } catch (Exception e) {
            e.printStackTrace();
            return;
        }
        System.out.println("Consumer Started.");
    }

    /**
     * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
     */
    public void init() {
        try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    private static String getInstanceName(String namesrvAddr) {
        return getHostAddress() + namesrvAddr;
    }

    private static String getHostAddress(){
        try {
            return InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        return "";
    }

    /**
     * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
     * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
     */
    public static void main(String[] args) throws InterruptedException,
            MQClientException {
        MessageListenerConcurrently messageListener = new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                MessageExt msg = msgs.get(0);
                if (msg.getTopic().equals("TopicTestZJH")) {
                    System.out.println("TopicTestZJH->" + new String(msg.getBody()));
                } else if (msg.getTopic().equals("TopicTestYIDU")) {
                    System.out.println("TopicTestYIDU->" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        };
        RocketMQPushConsumer consumer = new RocketMQPushConsumer("10.7.29.121:9876",
                "ConsumerGroupName", "Consumer","TopicTestZJH;TopicTestYIDU",messageListener);
        consumer.init();
        /**
         * spring构造器注入
         */
//        <bean id="rocketMQPushConsumer" class="com.tf56.queue.client.RocketMQPushConsumer">
//            <constructor-arg name="namesvrAddr" value="10.7.29.121:9876"/>
//            <constructor-arg name="groupName" value="ConsumerGroupName"/>
//            <constructor-arg name="instanceName" value="Consumer"/>
//            <constructor-arg name="topics" value="TopicTestZJH;TopicTestYIDU"/>
//            <constructor-arg name="messageListener" ref="messageListener"/>
//        </bean>
    }
}
时间: 2024-10-25 07:51:23

rocketmq消息重复推送的问题的相关文章

nodejs+socketio+redis实现前端消息实时推送

nodejs+socketio+redis实现前端消息实时推送 1. 后端部分 发送redis消息 可以参考此篇实现(直接使用Jedis即可) http://www.cnblogs.com/binyue/p/4763352.html 2.后端部分: 接收redis消息 var redis; if(process.argv.length <= 2){ redis = require('redis').createClient(); }else{ redis = require('redis').c

基于HTTP协议之WEB消息实时推送技术原理及实现

很早就想写一些关于网页消息实时推送技术方面的文章,但是由于最近实在忙,没有时间去写文章.本文主要讲解基于 HTTP1.1 协议的 WEB 推送的技术原理及实现.本人曾经在工作的时候也有做过一些用到网页消息实时推送的项目,但是当时实现的都不是很完美,甚至有时候是通过 Ajax 轮训的方式实现.在网上也找过不少的资料,真正说到点子上的几乎没有,很多文章大都是长篇大论,说了一些新有名字,什么“HTTP 长连接”,“实时推送”,“Comet 长连接推送技术”等.但真正提到如何实现实时推送的文章倒是没有看

调用APP标准消息接口推送信息http协议

调用协议:Http协议 调用方式:CRM中新分派线索(实时)或者线索未及时更新(定时,每天执行一次)时,调用APP标准消息接口推送信息,成功后返回标记已通知过. 接口调用方法如下: { "apikey":"xxxsadfsd", "identifier":"com.xx.xx",//向移动应用平台申请 "receiverType":"NAME", "receiverValue&

浏览器消息自动推送研究

首先说明,这篇博文不是科普讲解的,而是立flag研究的,是关于浏览器消息自动推送,就是下面这个玩意: 最近常常在浏览器看到这样的消息推送,还有QQ.com的推送,现在我对这个不了解,不知道叫消息自动推送对不对,这个时chrome浏览器的截图,出现在右下角,其他浏览器的样式可能有些微差别. websocket通信?浏览器广告推送?html5自动更新?灵异事件? ----------------------------我是研究的结果华丽的分割线----------------------------

dwr3实现消息精确推送详细步骤

最近项目中需要用到推送消息,找了很久终于找到一篇不错的文章,方便以后查看就转载了,也分享给大家,希望能帮到有需要的人. 第一.在项目中引入dwr.jar,然后在web.xml中进行配置,配置如下: <servlet> <servlet-name>dwr-invoker</servlet-name> <servlet-class> org.directwebremoting.servlet.DwrServlet </servlet-class> &

Spring+Websocket实现消息的推送

Websocet服务端实现 WebSocketConfig.java @Configuration @EnableWebMvc @EnableWebSocket public class WebSocketConfig extends WebMvcConfigurerAdapter implements WebSocketConfigurer { @Override public void registerWebSocketHandlers(WebSocketHandlerRegistry re

快递企业如何完成运单订阅消息的推送

原文:快递企业如何完成运单订阅消息的推送 经常网购的朋友,会实时收到运单状态的提醒信息,这些提醒信息包括微信推送,短信推送,邮件推送,支付宝生活窗推送,QQ推送等,信息内容主要包括快件到哪里,签收等信息的提醒,这些友好的提醒信息会极大的增强购物者的体验. 笔者目前正在一家快递企业做这类消费消息的推送功能开发(大部分快递企业都有实现在客户寄完快件后可以主动接收到快递企业的运单状态推送信息),对这部分有一些体会,现分享给大家(大部分功能可能只能通过代码才方便体现出来). 订阅和推送的流程图 一.订阅

#研发中间件介绍#异步消息可靠推送Notify

郑昀 基于朱传志的设计文档 最后更新于2014/11/11 关键词: 异步消息 .订阅者集群.可伸缩.Push模式.Pull模式 本文档适用人员:研发 电商系统为什么需要 NotifyServer? 如子柳所说,电商系统『 需要两种中间件系统,一种是实时调用的中间件(淘宝的HSF,高性能服务框架).一种是异步消息通知的中间件(淘宝的Notify)』.那么用传统的 ActiveMQ/RabbitMQ 来实现 异步消息发布和订阅 不行吗? 2013年之前我们确实用的是 ActiveMQ,当然主要是订

Android上如何确保消息被推送到

一般情况下,可以侦听一些经常发生的消息,如电量变化.开关屏幕.网络切换等等,注册BroadcastReceiver来接收消息,接收到后启动推送消息的Service. 现在有些厂商的ROM,一旦用户主动杀掉了进程,则不再投送广播消息给应用,导致应用无法启动,在这种情况下,可以用C Fork出来一个进程,检查主进程是否被杀,一旦被杀,立即调用shell命令启动Service,这个做法稍微流氓了一点,但有些应用,如IM,这样是提升用户体验的. 还有些厂商,如华为,在杀进程的时候,Fork出来的进程一起