RocketMQ实战(二)

在上一篇《RocketMQ实战(一)》中已经为大家初步介绍了下RocketMQ以及搭建了双Master环境,接下来继续为大家介绍!

Quick Start

写一个简单的生产者、消费者,带大家快速体验RocketMQ~

Maven配置:

生产者:

消费者:

无论生产者、消费者都必须给出GroupName,而且具有唯一性!

生产到哪个Topic的哪个Tag下,消费者也是从Topic的哪个Tag进行消费,可见这个Tag有点类似于JMS Selector机制,即实现消息的过滤。

生产者、消费者需要设置NameServer地址。

这里,采用的是Consumer Push的方式,即设置Listener机制回调,相当于开启了一个线程。以后为大家介绍Consumer Pull的方式。

我们看一下运行结果:

仔细看看生产者结果输出,你会发现,有的消息发往broker-a,有的在broker-b上,自动实现了消息的负载均衡!

这里消费消息是没有什么顺序的,以后我们在来谈消息的顺序性。

我们再来看一看管控台:

在多Master模式中,如果某个Master进程挂了,显然这台broker将不可用,上面的消息也将无法消费,要知道开源版本的RocketMQ是没有提供切换程序,来自动恢复故障的,因此在实际开发中,我们一般提供一个监听程序,用于监控Master的状态。

在ActiveMQ中,生产消息的时候会提供是否持久化的选择,但是对于RocketMQ而言,消息是一定会被持久化的!

上面的消费者采用的是Push Consumer的方式,那么监听的Listener中的消息List到底是多少条呢?虽然提供了API,如consumer.setConsumeMessageBatchMaxSize(10),实际上即使设置了批量的条数,但是注意了,是最大是10,并不意味着每次batch的都是10,只有在消息有挤压的情况下才有可能。而且Push Consumer的最佳实践方式就是一条条的消费,如果需要batch,可以使用Pull Consumer。

务必保证先启动消费者进行Topic订阅,然后在启动生产者进行生产(否则极有可能导致消息的重复消费,重复消费,重复消费!重要的事情说三遍!关于消息的重复问题后续给大家介绍~)。而且在实际开发中,有时候不会批量的处理消息,而是原子性的,单线程的去一条一条的处理消息,这样就是实时的在处理消息。(批量的处理海量的消息,可以考虑Kafka)

初步了解消息失败重试机制

消息失败,无非涉及到2端:从生产者端发往MQ的失败;消费者端从MQ消费消息的失败;

生产者端的失败重试

生产者端的消息失败:比如网络抖动导致生产者发送消息到MQ失败。

上图代码示例的处理手段是:如果该条消息在1S内没有发送成功,那么重试3次。

消费者端的失败重试

消费者端的失败,分为2种情况,一个是timeout,一个是exception

timeout,比如由于网络原因导致消息压根就没有从MQ到消费者上,在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!(比如集群中一个broker失败,就尝试另一个broker)

exception,消息正常的到了消费者,结果消费者发生异常,处理失败了。这里涉及到一些问题,需要我们思考下,比如,消费者消费消息的状态有哪些定义?如果失败,MQ将采取什么策略进行重试?假设一次性批量PUSH了10条,其中某条数据消费异常,那么消息重试是10条呢,还是1条呢?而且在重试的过程中,需要保证不重复消费吗?

消息消费的状态,有2种,一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)

在启动broker的过程中,可以观察下日志,你会发现RECONSUME_LATER的策略。

如果消费失败,那么1S后再次消费,如果失败,那么5S后,再次消费,......直至2H后如果消费还失败,那么该条消息就会终止发送给消费者了!

RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要在重试呢,因为重试解决不了问题了!这该如何做呢?

我们先来看一下一条消息MessageExt对象的输出:

MessageExt [queueId=0, storeSize=137, queueOffset=0, sysFlag=0, bornTimestamp=1492213846916, bornHost=/192.168.99.219:50478, storeTimestamp=1492213846981, storeHost=/192.168.99.121:10911, msgId=C0A8637900002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest2, flag=0, properties={TAGS=TagA, WAIT=true, MAX_OFFSET=3, MIN_OFFSET=0}, body=16]]

注意到reconsumeTimes属性,这个属性就代表消息重试的次数!来看一段代码:

注意了,对于消费消息而言,存在2种指定的状态(成功 OR 失败重试),如果一条消息在消费端处理没有返回这2个状态,那么相当于这条消息没有达到消费者,势必会再次发送给消费者!也即是消息的处理必须有返回值,否则就进行重发。

天然的消息负载均衡及高效的水平扩展机制

对于RocketMQ而言,通过ConsumeGroup的机制,实现了天然的消息负载均衡!通俗点来说,RocketMQ中的消息通过ConsumeGroup实现了将消息分发到C1/C2/C3/......的机制,这意味着我们将非常方便的通过加机器来实现水平扩展!

我们考虑一下这种情况:比如C2发生了重启,一条消息发往C3进行消费,但是这条消息的处理需要0.1S,而此时C2刚好完成重启,那么C2是否可能会收到这条消息呢?答案是肯定的,也就是consume broker的重启,或者水平扩容,或者不遵守先订阅后生产消息,都可能导致消息的重复消费!关于去重的话题会在后续中予以介绍!

至于消息分发到C1/C2/C3,其实也是可以设置策略的。

集群消费 AND 广播消费

RocketMQ的消费方式有2种,在默认情况下,就是集群消费,也就是上面提及的消息的负载均衡消费。另一种消费模式,是广播消费。广播消费,类似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每一个消费者进行消费。

OK,到这里,本期的RocketMQ就结束了,咱们下期见~

时间: 2024-10-08 16:40:49

RocketMQ实战(二)的相关文章

Python爬虫实战二之爬取百度贴吧帖子

大家好,上次我们实验了爬取了糗事百科的段子,那么这次我们来尝试一下爬取百度贴吧的帖子.与上一篇不同的是,这次我们需要用到文件的相关操作. 前言 亲爱的们,教程比较旧了,百度贴吧页面可能改版,可能代码不好使,八成是正则表达式那儿匹配不到了,请更改一下正则,当然最主要的还是帮助大家理解思路. 2016/12/2 本篇目标 1.对百度贴吧的任意帖子进行抓取 2.指定是否只抓取楼主发帖内容 3.将抓取到的内容分析并保存到文件 1.URL格式的确定 首先,我们先观察一下百度贴吧的任意一个帖子. 比如:ht

RocketMQ实战(一)

阿里巴巴有2大核心的分布式技术,一个是OceanBase,另一个就是RocketMQ.在实际项目中已经领教过RocketMQ的强大,本人计划写一个RocketMQ实战系列,将涵盖RocketMQ的简介,环境搭建,初步使用.API详解.架构分析.管理员集群操作等知识. What is RocketMQ? RocketMQ作为一款分布式的消息中间件(阿里的说法是不遵循任何规范的,所以不能完全用JMS的那一套东西来看它),经历了Metaq1.x.Metaq2.x的发展和淘宝双十一的洗礼,在功能和性能上

FastDFS安装使用实战二(配置篇)

FastDFS安装使用实战二(配置篇) Keywords:FastDFS.分布式文件系统.Ubuntu Author:soartju 转载请注明出处:http://soartju.iteye.com/blog/803524 FastDFS的配置文件在%FastDFS%/conf目录下,其中包括 Client.conf    客户端上传配置文件 Storage.conf    文件存储服务器配置文件 Tracker.conf    负责均衡调度服务器配置文件 http.conf        ht

Python机器学习实战<二>:机器学习概述

1.机器学习的真实含义是利用数据来彰显数据背后的真实含义. 2.机器学习的一般用例:人脸识别.手写数字识别.垃圾邮件过滤.产品推荐等等. 3.机器学习的主要任务是分类,即将实例数据划分到合适的分类中.另一项任务是回归,主要用于预测数值型数据.分类和回归属于监督学习,之所以称为监督学习,是因为这类算法必须知道预测什么,即目标的分类信息.另一种机器学习方式是无监督学习,此时数据没有类别信息,也没有给定的目标.在无监督学习中,将数据集合分成由类似对象组成的多个类成为聚类,将寻找数据统计值的过程称为密度

转 Python爬虫实战二之爬取百度贴吧帖子

静觅 » Python爬虫实战二之爬取百度贴吧帖子 大家好,上次我们实验了爬取了糗事百科的段子,那么这次我们来尝试一下爬取百度贴吧的帖子.与上一篇不同的是,这次我们需要用到文件的相关操作. 本篇目标 1.对百度贴吧的任意帖子进行抓取 2.指定是否只抓取楼主发帖内容 3.将抓取到的内容分析并保存到文件

二 Flask Project 实战 二

5 templates {{ and }} is an expression{% and %} denotes a control flow statement like if and for blocks defined here that will be overridden in the other templates{% extends 'base.html' %} tells Jinja that this template should replace the blocks from

Docker最全教程之Python爬网实战(二十一)

原文:Docker最全教程之Python爬网实战(二十一) Python目前是流行度增长最快的主流编程语言,也是第二大最受开发者喜爱的语言(参考Stack Overflow 2019开发者调查报告发布).笔者建议.NET.Java开发人员可以将Python发展为第二语言,一方面Python在某些领域确实非常犀利(爬虫.算法.人工智能等等),另一方面,相信我,Python上手完全没有门槛,你甚至无需购买任何书籍! 由于近期在筹备4.21的长沙开发者大会,耽误了不少时间.不过这次邀请到了腾讯资深技术

数据-第16课-栈的应用实战二

第16课-栈的应用实战二 1. 问题的提出 计算机的本质工作就是数学运算,那计算机可以读入字符串”9 + (3 - 1) *5 +8/2”并且计算值吗? 2. 后缀表达式 波兰科学家在20世纪50年代提出了一种将运算符放在数字后面的后缀表达式. 对应的,我们平时用的数学表达式叫做中缀表达式. 实例 5 + 3 => 5 3 + 1 + 2 * 3 => 1 2 3 * + 9 + ( 3–1 ) * 5 => 9 3 1–5 * + 中缀表达式符合人类的阅读和思维习惯:后缀表达式符合计算

数据--第20课-递归的应用实战二

第20课-递归的应用实战二 1. 递归与回溯 (1)递归在程序设计中也常用于需要回溯算法的场合. (2)回溯算法的基本思想. ① 从问题的某一种状态出发,搜索可以到达的所有状态. ② 当某个状态到达后,可向前回退,并继续搜索其它可达状态 ,并继续搜索其它可达状态. ③ 当所有状态都到达后,回溯算法结束. (3)程序设计中可利用函数的活动对象保存回溯算法的状态数据,因此可以利用递归完成回溯算法 2. 八皇后问题 在一个8×8国际象棋盘上,有8个皇后,每个皇后占一格:要求皇后间不会出现相互“攻击”的