rocketmq 消息队列 (一) 初步了解

消息队列概述

队列的本质
  • 一次RPC变成两次 RPC
  • 内容转储
  • 选择合适的时机投递

    队列设计重点

  • RPC 通信协议
  • 存储选型
  • 消费关系处理
  • 实现事务
  • 防丢/防重
  • 批量/异步与性能   强烈推荐这篇文章,从设计的角度来思考消息队列的各种问题,阅读源码只是理解设计的最终实现,只有知道了设计的思路阅读源码才会更加容易理解和更加容易吸收。

使用示例

生产端
public class Product {
    public static void main(String[] args) throws Exception {
        // 实例化消息生产者Producer
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 设置NameServer的地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动Producer实例
        producer.start();
        for (int i = 0; i < 100; i++) {
            // 创建消息,并指定Topic,Tag和消息体
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            // 发送消息到一个Broker
            SendResult sendResult = producer.send(msg);
            // 通过sendResult返回消息是否成功送达
            System.out.printf("%s%n", sendResult);
        }
        // 如果不再发送消息,关闭Producer实例。
        producer.shutdown();
    }
}
消费端
public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");

        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe("TopicTest", "*");
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

RocketMQ名词解释

  • NameServer:在系统中 是做命名服务,更新和发现 broker服务。就像dubbo 中的 zk一样的角色。主要作用为消息生产和消费消费者提供关于主题Topic 的路由消息
  • Broker-Master:broker 消息主机服务器。
  • Broker-Slave: broker 消息从机服务器。
  • Producer: 消息生产者。
  • Consumer: 消息消费者。

角色之间如何工作

  Broker 消息服务器在启动时向所有NameServer注册,NameServer启动定时任务,监视broker 是否存活,producer在发送消息前先从NameServer获取Broker服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。

RocketMQ启动

  我们先来看一下启动的脚本

start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

  可以知道实际就是运行两个cmd 文件,启动的相关类有 : BrokerStartup 和 NamesrvStartup 。 后面的文章我们将会介绍关于这两个类

总结

  文章从设计思路入手来了解消息队列,后面的文章我们将通过阅读源码落地设计思路。

参考资料

  • http://gd-rus-public.cn-hangzhou.oss-pub.aliyun-inc.com/attachment/201604/08/20160408165024/RocketMQ_design.pdf
  • https://erik1288.github.io/
  • https://erik1288.github.io/2017/11/03/RocketMQ-Message-send-and-persistence/
  • https://zhuanlan.zhihu.com/p/21649950 (美团,设计一个消息队列)
  • https://segmentfault.com/a/1190000015301449 (消息队列的面试题)

原文地址:https://www.cnblogs.com/Benjious/p/11634432.html

时间: 2024-11-10 10:41:08

rocketmq 消息队列 (一) 初步了解的相关文章

RocketMQ 消息队列单机部署及使用

转载请注明来源:http://blog.csdn.net/loongshawn/article/details/51086876 相关文章: <RocketMQ 消息队列单机部署及使用> < java编写简单消息队列.实现高德坐标变形服务> 0 RocketMQ简单介绍 0.1 介绍 RocketMQ是一个消息中间件. 消息中间件中有两个角色:消息生产者和消息消费者.RocketMQ里相同有这两个概念.消息生产者负责创建消息并发送到RocketMQ服务器.RocketMQ服务器会将

RocketMq消息队列使用

最近在看消息队列框架 ,alibaba的RocketMQ单机支持1万以上的持久化队列,支持诸多特性, 目前RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景 比kafka还是有过之无不及,其实kafka文档很丰富 但RocketMQ网上的文章太少,找不到相关的操作教程 于是研究了下源码 做个单机操作的教程,如果你也对此有兴趣不妨共同研究 下载源码的地址 https://github.com/alibaba/RocketMQ/relea

RocketMQ 消息队列简单部署

RocketMQ 是alibaba开源的消息队列. 本文使用的是开源版本v3.18 系统: centos6.x最小化安装 需要用到的软件包: jdk-7u67-linux-x64.tar.gz alibaba-rocketmq-3.1.8.tar.gz 开始安装 #tar xvf jdk-7u67-linux-x64.tar.gz -C /opt/ #tar xvf alibaba-rocketmq-3.1.8.tar.gz -C /opt/ #ln -s /opt/jdk1.7.0_67 /o

消息队列ActiveMQ初步

安装ActiveMQ 官网地址:http://activemq.apache.org/ 解压后基本目录结构: bin存放的是脚本文件 conf存放的是基本配置文件 data存放的是日志文件 docs存放的是说明文档 examples存放的是简单的实例 lib存放的是activemq所需jar包 webapps用于存放项目的目录 进入 bin 目录: ./activemq star # 启动activeMQ服务 ./activemq stop # 关闭activeMQ服务 ActiveMQ 默认启

消息队列常用应用场景介绍

消息队列作为分布式系统中重要的组件,可以解决应用耦合,异步消息,流量削锋等系列问题 实现高性能,高可用,可伸缩和最终一致性架构 使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ 消息队列应用场景 以下介绍消息队列在实际应用中常用的使用场景. 1 异步处理 场景说明:用户注册后,需要发注册邮件和注册短信.传统的做法有两种 1.串行的方式:2.并行方式 (1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信.以上三个任

分布式消息队列RocketMQ部署与监控

========================================================================================== 一.RocketMQ简介 ========================================================================================== RocketMQ是一款分布式.队列模型的消息中间件,具有以下特点: 1.支持严格的消息顺序: 2.支持Topi

RabbitMQ,Apache的ActiveMQ,阿里RocketMQ,Kafka,ZeroMQ,MetaMQ,Redis也可实现消息队列,RabbitMQ的应用场景以及基本原理介绍,RabbitMQ基础知识详解,RabbitMQ布曙

消息队列及常见消息队列介绍 2017-10-10 09:35操作系统/客户端/人脸识别 一.消息队列(MQ)概述 消息队列(Message Queue),是分布式系统中重要的组件,其通用的使用场景可以简单地描述为: 当不需要立即获得结果,但是并发量又需要进行控制的时候,差不多就是需要使用消息队列的时候. 消息队列主要解决了应用耦合.异步处理.流量削锋等问题. 当前使用较多的消息队列有RabbitMQ.RocketMQ.ActiveMQ.Kafka.ZeroMQ.MetaMq等,而部分数据库如Re

滴滴出行基于RocketMQ构建企业级消息队列服务的实践

小结: 1. https://mp.weixin.qq.com/s/v6NM3UgX-qTI7yO1QPCJrw 滴滴出行基于RocketMQ构建企业级消息队列服务的实践 原创: 江海挺 阿里巴巴中间件 2018-11-01 原文地址:https://www.cnblogs.com/yuanjiangw/p/10780829.html

MQ选型对比ActiveMQ,RabbitMQ,RocketMQ,Kafka 消息队列框架选哪个?

最近研究消息队列,发现好几个框架,搜罗一下进行对比,说一下选型说明: 1)中小型软件公司,建议选RabbitMQ.一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便.不考虑rocketmq和kafka的原因是,一方面中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以kafka排除.RocketMQ也很不错,只是没有RabbitMQ出来的早,文档和网上的资料没有RabbitMQ多,但也是很不错,RocketMQ是阿里出品,现在阿里已经把