rocketmq消费队列代码

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(Constant.operationLogGroup);
        try {
            consumer.setNamesrvAddr(Constant.rocketQueneAddr);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.subscribe(Constant.operationLogTopic, Constant.operationLogTag);
        } catch (MQClientException e) {
            logger.error("consume operation log MQ error", e);
        }

        cometutil = Comet4jUtil.getInstance(CHANNEL);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

                byte[] bytes = msgs.get(0).getBody();
                try {
                    cometutil.sendMesToAllConnsWithString(CHANNEL, new String(bytes, "UTF-8"));
                } catch (UnsupportedEncodingException e) {
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        try {
            consumer.start();
            logger.info("operationLogController‘s MQ consumer started.");
        } catch (MQClientException e) {
            logger.error("consume operation log MQ start error", e);
        }

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-12-29 09:45:55

rocketmq消费队列代码的相关文章

RocketMQ 消息队列单机部署及使用

转载请注明来源:http://blog.csdn.net/loongshawn/article/details/51086876 相关文章: <RocketMQ 消息队列单机部署及使用> < java编写简单消息队列.实现高德坐标变形服务> 0 RocketMQ简单介绍 0.1 介绍 RocketMQ是一个消息中间件. 消息中间件中有两个角色:消息生产者和消息消费者.RocketMQ里相同有这两个概念.消息生产者负责创建消息并发送到RocketMQ服务器.RocketMQ服务器会将

RocketMq消息队列使用

最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性, 目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景 比kafka还是有过之无不及,其实kafka文档很丰富 但RocketMQ网上的文章太少,找不到相关的操作教程 于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究 下载源码的地址 https://github.com/alibaba/RocketMQ/relea

RocketMQ消费批拉超过32不生效

由于一些原因,我需要RocketMQ消费的时候,一批拉400条,一批处理400条.设置如下: 为了简单验证是否正确,消费如下: 直接通过打印msgs.size()观察情况即可. 现象 实验的topic里面的消息数量很多很多,但是启动消费端,消费端的日志如下: 奇怪啦,明明已经进行了修改 为什么还是每次只能消费32条呢? 调试RocketMQ源码 通过跟踪consumer代码: 这里的确已经设置为400了,那么我们需要跟踪到broker服务端进行查看了. broker接受到的也是400,我们只有继

链队列代码及应用

链队列代码 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <time.h> #define OK 1 #define ERROR 0 #define OVERFLOW -2 #define TRUE 1 #define FALSE 0 typedef int Status; typedef int ElemType; typedef struct Qnode{ int

消息消费队列和索引文件的更新

ConsumeQueue,IndexFile需要及时更新,否则无法及时被消费,根据消息属性查找消息也会出现较大延迟. mq通过开启一个线程ReputMessageService来准时转发commitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumeQueue,IndexFile文件 DefaultMessageStore#start ReputMessageService线程每执行一次任务推送休息1毫秒旧继续尝试推送消息到消息消费队列和索引文件. 返回reputFromOf

rocketmq 消息队列 (一) 初步了解

消息队列概述 队列的本质 一次RPC变成两次 RPC 内容转储 选择合适的时机投递 队列设计重点 RPC 通信协议 存储选型 消费关系处理 实现事务 防丢/防重 批量/异步与性能   强烈推荐这篇文章,从设计的角度来思考消息队列的各种问题,阅读源码只是理解设计的最终实现,只有知道了设计的思路阅读源码才会更加容易理解和更加容易吸收. 使用示例 生产端 public class Product { public static void main(String[] args) throws Excep

RocketMQ 消息队列简单部署

RocketMQ 是alibaba开源的消息队列. 本文使用的是开源版本v3.18 系统: centos6.x最小化安装 需要用到的软件包: jdk-7u67-linux-x64.tar.gz alibaba-rocketmq-3.1.8.tar.gz 开始安装 #tar xvf jdk-7u67-linux-x64.tar.gz -C /opt/ #tar xvf alibaba-rocketmq-3.1.8.tar.gz -C /opt/ #ln -s /opt/jdk1.7.0_67 /o

顺序队列代码

队列:一种特殊的线性表,其特性就是先进先出(FIFO),即从一端进从另一端出 队头:允许删除的一端  front 队尾:允许插入的一端  rear 如下图所示,出队和入队的流程: --------------------------------------------------------------------------------------------------------------------------------------------------- 可以发现,无论是出队还是

SpringBoot RabbitMQ 延迟队列代码实现

场景 用户下单后,如果30min未支付,则删除该订单,这时候就要可以用延迟队列 准备 利用rabbitmq_delayed_message_exchange插件: 首先下载该插件:https://www.rabbitmq.com/community-plugins.html 然后把该插件放到rabbitmq安装目录plugins下: 进入到sbin目录下,执行"rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange";