RocketMQ(2)

1. 消费端集群消费(负载均衡)

 示例代码:

/**
 * Producer,发送消息
 *
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("message_producer");
        producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                    "Tag1",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

/**
 * Consumer,订阅消息
 */
public class Consumer1 {

    public Consumer1() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);

                    System.out.println("======暂停=====");
                    Thread.sleep(60000);
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer1 consumer1 = new Consumer1();
        System.out.println("Consumer1 Started.");
    }
}

/**
 * Consumer,订阅消息
 */
public class Consumer2 {

    public Consumer2() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer2 consumer2 = new Consumer2();
        System.out.println("Consumer2 Started.");
    }
}

一个生产者,两个消费者,注意两个消费者的组名要一样。

先启动两个消费者(customer1,customer2),通过控制台查看如下:

再启动生产者生成100条消息,消费情况如下:

生成的100条消息被customer1和customer2平均的消费了。可以通过consumer.setAllocateMessageQueueStrategy去设置分配策略。

BTW:这是默认的模式,可以通过consumer.setMessageModel设置,MessageModel.CLUSTERING | MessageModel.BROADCASTING,如果是广播消费,则每个客户端都会收到生产端的所有消息

2.消息未响应会重发

代码示例:

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("message_producer");
        producer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
        producer.start();

        for (int i = 0; i < 1; i++) {
            try {
                Message msg = new Message("TopicTest",// topic
                    "Tag1",// tag
                    ("Hello RocketMQ " + i).getBytes()// body
                        );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

public class Consumer1 {

    public Consumer1() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);

                    System.out.println("======暂停=====");
                    Thread.sleep(600000);
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer1 consumer1 = new Consumer1();
        System.out.println("Consumer1 Started.");
    }
}

public class Consumer2 {

    public Consumer2() {
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("message_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            consumer.subscribe("TopicTest", "Tag1||Tag2||Tag3");
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (MQClientException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    class Listener implements MessageListenerConcurrently {

        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), "utf-8");
                    String tags = msg.getTags();
                    System.out.println("收到消息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                }
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }

    public static void main(String[] args) throws InterruptedException, MQClientException {
        Consumer2 consumer2 = new Consumer2();
        System.out.println("Consumer2 Started.");
    }
}

先启动consumer1,再启动consumer2,最后启动producer

consumer1收到了消息,consumer2没有收到消息,这时把consumer1强制停止,也就是说consumer1不会给MQ返回响应,查看结果:

consumer2也收到消息了,说明在MQ没收到消费端响应的情况下,会重发消息。

3. 修改topic的队列数

默认的队列数是4个,可以从执行结果中看出:queueId都是0-3

细节可以看https://www.cnblogs.com/dyfh/p/4113677.html

可以增加设置producer.createTopic("TopicTest", "TopicTest", 8);

原文地址:https://www.cnblogs.com/lostyears/p/8582299.html

时间: 2024-08-30 07:41:34

RocketMQ(2)的相关文章

分布式开放消息系统(RocketMQ)的原理与实践

分布式消息系统作为实现分布式系统可扩展.可伸缩性的关键组件,需要具有高吞吐量.高可用等特点.而谈到消息系统的设计,就回避不了两个问题: 消息的顺序问题 消息的重复问题 RocketMQ作为阿里开源的一款高性能.高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的? 关键特性以及其实现原理 一.顺序消息 消息有序指的是可以按照消息的发送顺序来消费.例如:一笔订单产生了 3 条消息,分别是订单创建.订单付款.订单完成.消费时,要按照顺序依次消费才有意

rocketmq 命令示例

http://www.360doc.com/content/16/0111/17/1073512_527143896.shtml http://www.cnblogs.com/marcotan/p/4256857.html RocketMQ常用命令 二.根据msgId查询消息 1.文档: 指令 queryMsgById 类路径 com.alibaba.rocketmq.tools.command.message.QueryMsgByIdSubCommand 参数 是否必填 说明 -i 是 msg

rocketmq安装与基本操作

如果不是因为政治原因,就rocketmq的社区活跃度.版本.特性和文档完善度,我是无论如何也不会使用rocketmq的. rocketmq严格意义上并不支持高可靠性,因为其持久化只支持异步,有另外一个线程flush,不支持配置同步刷新到磁盘.只能说多个节点宕机的概率很低很低,外加现在的服务器一般都是UPS. rocketmq官方提供了一份与activemq,kafka的特性对比(但没有包括与rabbitmq的比较).引用如下: Messaging Product Client SDK Proto

50.RocketMQ (quickstart)

1.订阅消息 /** * Copyright (C) 2010-2013 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at *

rocketmq源码分析3-consumer消息获取

使用rocketmq的大体消息发送过程如下: 在前面已经分析过MQ的broker接收生产者客户端发过来的消息的过程,此文主要讲述订阅者获取消息的过程,或者说broker是怎样将消息传递给消费者客户端的,即上面时序图中拉取消息(pull message)动作.. 1. 如何找到入口(MQ-broker端) 分析一个机制或者功能时,我们首先希望的是找到入口,前一篇我们是通过端口号方式顺藤摸瓜的方式找到了入口.但是此篇略微不同,涉及到consumer客户端与broker的两边分析,最终发现逻辑还是比较

RocketMQ源码学习--消息存储篇

1.序言 今天来和大家探讨一下RocketMQ在消息存储方面所作出的努力,在介绍RocketMQ的存储模型之前,可以先探讨一下MQ的存储模型选择. 2.MQ的存储模型选择 个人看来,从MQ的类型来看,存储模型分两种: 需要持久化(ActiveMQ,RabbitMQ,Kafka,RocketMQ) 不需要持久化(ZeroMQ) 本篇文章主要讨论持久化MQ的存储模型,因为现在大多数的MQ都是支持持久化存储,而且业务上也大多需要MQ有持久存储的能力,能大大增加系统的高可用性,下面几种存储方式: 分布式

rocketMq

1. 下载安装包,解压 https://github.com/alibaba/RocketMQ 2. cmd 3. 启动 mqnameserv C:\Users\Administrator>cd D:\download\alibaba-rocketmq\bin C:\Users\Administrator>mqnamesrv.exe -n 127.0.0.1:9876;192.168.0.1:9876 'mqnamesrv.exe' 不是内部或外部命令,也不是可运行的程序 或批处理文件. C:

转:Kafka、RabbitMQ、RocketMQ消息中间件的对比 —— 消息发送性能 (阿里中间件团队博客)

from: http://jm.taobao.org/2016/04/01/kafka-vs-rabbitmq-vs-rocketmq-message-send-performance/ 引言 分布式系统中,我们广泛运用消息中间件进行系统间的数据交换,便于异步解耦.现在开源的消息中间件有很多,前段时间我们自家的产品 RocketMQ (MetaQ的内核) 也顺利开源,得到大家的关注. 那么,消息中间件性能究竟哪家强? 带着这个疑问,我们中间件测试组对常见的三类消息产品(Kafka.RabbitM

RocketMQ与Kafka对比(18项差异)

转自:https://github.com/alibaba/RocketMQ/wiki/rmq_vs_kafka 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步优化,2011年初,Linkin开源了Kafka这个优秀的消息中间件,淘宝中间件团队在对Kafka做过充分Review之后,Kafka无限消息堆积,高效的持久化速度吸引了我们,但是同时发现这个消息系统主要定位于日志传输,对于使用在

RocketMQ 源码分析

RocketMQ 源码分析 RocketMQ 的设计思想来自于Kafka,在具体设计时体现了自己的选择和需求,具体差别可以看RocketMQ与Kafka对比(18项差异).接下来记录下自己阅读源码的一些探索. RocketMQ的整体架构如下,可以看到各个组件充当的角色,Name Server 负责维护一些全局的路由信息:当前有哪些broker,每个Topic在哪个broker上等; Broker具体处理消息的存储和服务:生产者和消费者是消息的源头和归宿. 在知道各个角色的基本位置后,就该让程序跑