1、RocketMQ部署架构
所有IP都是127.0.0.1,其中NameServer一个,Broker一个,Producer一个,Consumer一个
2、 RocketMQ环境搭建
(1).下载安装包 下载地址:https://github.com/alibaba/RocketMQ/releases
(2).解压,并进入bin目录
(3).启动name server
(4).启动broker
(5).启动完成
3、 RocketMQ调用使用JAR:
4、 RocketMQ一些概念
RocketMQ以Topic来管理不同应用的消息。对于生产者而言,发送消息是,需要指定消息的Topic,对于消费者而言,在启动后,需要订阅相应的Topic,然后可以消费相应的消息。Topic是逻辑上的概念,在物理实现上,一个Topic由多个Queue组成,采用多个Queue的好处是可以将Broker存储分布式化,提高系统性能。
RocketMQ中,producer将消息发送给Broker时,需要制定发送到哪一个队列中,默认情况下,producer会轮询的将消息发送到每个队列中(所有broker下的Queue合并成一个List去轮询)。
对于consumer而言,会为每个consumer分配固定的队列(如果队列总数没有发生变化),consumer从固定的队列中去拉取没有消费的消息进行处理。
Producer
Producer端(属于client)的逻辑概述:
producer端的逻辑都比较简单,将消息发送到某个Queue中即可,具体发送到那个Queue可以由用户控制(MessageQueueSelector接口),默认情况下,将轮询方式选择Queue。在producer端,会从NameServer将所有Broker的Topic及对应的Queue信息(即:TopicRoute信息)拉取到本地,然后根据(brokerName, queueId)组建成一个List。因此在MessageQueueSelector,可以看到所有的Queue信息。
RocketMQ将topic的消息以多个Queue来管理,使得其较为容易的就可以进行水平扩展,提供系统吞吐力。这样分布带来的问题,就是从全局上不能做到顺序性(很多时候也并不需要全局上的顺序性)。
RocketMQ提到支持顺序消息,实际上是指基于Queue级别的顺序。用户将某些需要满足顺序的一批消息(比如电商某个订单号的一系列后续操作、比如数据库的某个主键的insert、delete、update等操作)发送到固定的某个Queue中,则从这个Queue消费消息的consumer,针对这一批消息是顺序消费。
问题1:针对顺序消息的队列,是否可以做到不停服务下的集群动态扩展?
Consumer
consumer逻辑稍微复杂一点。初步思考,consumer端至少需要处理:
(1) 消息的获取
(2) offset(消费进度)的管理与存储
(3) 集群消费模式下,Queue的分配问题(rebalance)
RocketMQ对外提供了两种不同形式的Consumer:PushConsumer和PullConsumer。顾名思义,对于PullConsumer而言,用户需要主动调用相应的接口去拉取未消费的消息。对于PushConsumer而言,用户提供消息处理的CallBack,有未曾消费的消息时,会主动回调这个CallBack来处理消息。虽从用户角度而言,Consumer存在主动(pull)和被动(push),但RocketMQ本身的broker端仅仅保存所有的消息,并不负责push消息,因此PushConsumer的底层实现也是有一个长连接主动去broker上拉取未消费的消息,然后回调用户的callback逻辑。
5、 RocketMQ如何使用
A. 生产者
B. 消费者
C. 控制器中调用
D. 启动消费者任务。注意:切记不可以在每次发送消息时,都调用start方法
6、 RocketMQ工作原理