50.RocketMQ (quickstart)

1.订阅消息

/**
 * Copyright (C) 2010-2013 Alibaba Group Holding Limited
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.rocketmq.example.quickstart;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;

/**
 * Consumer,订阅消息
 */
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("192.168.1.16:9876;192.168.1.17:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}

2.生产消息

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

/**
 * Producer,发送消息
 *
 */
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.1.16:9876;192.168.1.17:9876");
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {
                Message msg = new Message("TopicTest", // topic
                    "TagA", // tag
                    ("Hello RocketMQ " + i).getBytes()// body
                );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            }
            catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}
时间: 2025-01-15 19:20:04

50.RocketMQ (quickstart)的相关文章

Excel动画教程50例(三)

Excel动画教程50例(三) 31.Excel自定输入数据下拉列表 32.Excel正确输入身份证号码 33.Excel数据排序操作 34.Excel数据表格中如何将姓名信息按笔画排列 35.Excel数据格式设置 37.Excel内置序列批量填充 38.Excel模版的保存和调用 39.Excel监视窗口 40.Excel中行列隐藏操作 41.Excel工作簿加密保存 42.Excel公式引用方式转换 43.Excel中特殊符号的输入 44.Excel中文日期格式 45.Excel工作表的移

2017河南省考——必备时政热词50个(八)

46.[解放军"五大战区"] 东部战区.南部战区.西部战区.北部战区.中部战区. 47.[解放军"五大军种"] 陆军.海军.空军.火箭军.战略支援部队. 48.[人类命运共同体] 指在追求本国利益时兼顾他国合理关切,在谋求本国发展中促进各国共同发展.人类命运共同体这一全球价值观包含相互依存的国际权力观.共同利益观.可持续发展观和全球治理观. 49.[一带一路] 一带一路是"丝绸之路经济带"和"21世纪海上丝绸之路"的简称. 5

JAVA经典算法50题(转)

转载请注明出处:http://blog.csdn.net/l1028386804/article/details/51097928 JAVA经典算法50题 [程序1]   题目:古典问题:有一对兔子,从出生后第3个月起每个月都生一对兔子,小兔子长到第四个月后每个月又生一对兔子,假如兔子都不死,问每个月的兔子总数为多少?1.程序分析:兔子的规律为数列1,1,2,3,5,8,13,21.... [java] view plain copy public class Demo01 { public s

RocketMQ(二)集群配置

Broker集群部署方式主要有以下几种:(Slave 不可写,但可读) 单个Master 这种方式风险较大,一旦Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用. 多Master模式 一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master. 优点:配置简单,单个Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同

洛古最简单50题解(31-40)

做为一名新手,首先要过一过题,找找成就感.(大佬略过).下面附上洛古最简单50题(大佬略过).以及最麻烦 AC代码,至少AC了. NO.41 P2676 超级书架 #include<iostream> #include<algorithm> using namespace std; int main() { ????int i,n,b,ans,m; ????int a[100001]; ????cin>>n>>b; ????for(int i=0;i<

RocketMQ(四)——消息重试

文章目录 一. Producer端重试 二. Consumer端重试 1. Exception 2. Timeout总结 对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制.很显示,消息重试分为2种:Producer端重试和Consumer端重试. 一. Producer端重试 生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败.这种消息失败重试我们可以手动设置发送失败重试的次数,看一下

RocketMQ(六)——Order Message(顺序消息)

生产者端消费者端运行效果补充RocketMQ提供了3种模式的Producer:NormalProducer(普通).OrderProducer(顺序).TransactionProducer(事务),对应的分别是普通消息.顺序消息和事务消息.在前面的博客当中,涉及的都是NormalProducer,调用传统的send方法,消息是无序的.接下来,看看顺序消费.模拟这样一个场景,如果一个用户完成一个订单需要3条消息,比如订单的创建.订单的支付.订单的发货,很显然,同一个用户的订单消息必须要顺序消费,

RocketMQ(七)——Transaction Message(事务消息)

分布式事务 通过MQ解决分布式事务的思路 1) 业务和消息生成耦合在一起 2) 业务和消息解耦 RocketMQ 中的事务消息 1) 目前RMQ3.2.6中事务消息的实现原理及存在的问题 2) 问题解决思路 本文介绍RocketMQ提供的第三种类型的消息——Transaction Message(事务消息).在说事务消息之前,我们先来说说分布式事务的那些事! 分布式事务 什么是分布式事务,我的理解是一半事务.怎么说,比如有2个异构系统,A异构系统要做T1,B异构系统要做T2,要么都成功,要么都失

深入理解RocketMQ(四)--消息存储

一.MQ存储分类 文件系统:RocketMQ/Kafka/RabbitMQ 关系型数据库DB:ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化 分布式KV存储:ZeroMQ 对比: 存储效率, 文件系统>分布式KV存储>关系型数据库DB 易于实现和快速集成,关系型数据库DB>分布式KV存储>文件系统,但是性能会下降很多 二.RocketMQ存储概要 (一)存储文件 rocketmq |--store |-commitlog |      |-0