消息拉取

疑问:PullRequest何时添加?

PullMessageService提供延迟添加与立即添加2种方式

疑问:PullRequest是在什么时候创建的呢?

1.上上图中 PullRequest pullRequest = this.pullRequestQueue.take(); this.pullMessage(pullRequest);mq根据PullRequest拉取任务执行完一次消息拉取任务之后,又将PullRequest对象放入到pullRequestQueue,第二个是在RebalancceImpl中创建。

PullMessageService只有在拿到PullRequest对象时才会执行拉取任务,看一下PullRequest的属性:

看下this.pullMessage的实现:

ProcessQueue实现机制:

ProcessQueue是MessageQueue在消费端的重现,快照。PullMessageService从消息服务器默认每次拉取32条消息,按消息的队列偏移量顺序存放在ProcessQueue中,PullMessageService之后将消息提交到消费者消费线程池,消息成功消费后从ProcessQueue中移除。

消息拉取基本流程:

原文地址:https://www.cnblogs.com/lccsblog/p/12240113.html

时间: 2024-08-03 09:15:39

消息拉取的相关文章

Kafka consumer消息的拉取及偏移的管理

消费者拉取消息并处理主要有4个步骤: 获取消费者所拉取分区的偏移位置OffsetFetchRequest(新的消息是从偏移位置开始的) 创建FetchReqeust,生成Map<Node, FetchRequest>,以消费者所拉取消息的节点为key来分组,所消费的TopicPartition的数据为value,并放入到unsent队列 调用poll方法实际发送请求给相应的node,如果返回成功,在onSecuss方法中,消息被保存在completedFetches中 从completedFe

RocketMQ 拉取消息-文件获取

看完了上一篇的<RocketMQ 拉取消息-通信模块>,请求进入PullMessageProcessor中,接着 PullMessageProcessor.processRequest(final ChannelHandlerContext ctx, RemotingCommand request)方法中调用了: final GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessag

解决 MySQL 比如我要拉取一个消息表中用户id为1的前10条最新数据

我们都知道,各种主流的社交应用或者阅读应用,基本都有列表类视图,并且都有滑到底部加载更多这一功能, 对应后端就是分页拉取数据.好处不言而喻,一般来说,这些数据项都是按时间倒序排列的,用户只关心最新的动态,而不关心几个月甚至几年前消息,所以后端返回给客户端的数据是不会一次性传递全部内容的(不仅耗费流量,而且还给服务器带来巨大压力). 举个例就说MySQL,它已经给我们提供了相应的语句来支持这一功能,那就是limit关键字.比如我要拉取一个消息表中用户id为1的前10条最新数据,SQL语句如下: s

kafka 消费者拉取消息

本文只跟踪消费者拉取消息的流程.对于 java 客户端, kafka 的生产者和消费者复用同一个网络 io 类 NetworkClient. 入口在 KafkaConsumer#pollOnce 中,抽出主要步骤: // 构造 FetchRequest 请求,将请求对象放入 unsent 集合,等待发送 fetcher.sendFetches(); // 取出 unsent 中的请求,调用 NetworkClient#send,NetworkClinet#poll client.poll(pol

Android图片的拉取与缓存

Anroid应用中经常会有从网上拉取图片的需求,拉图片说简单也很好做,说难也是很费力的,虽然网上的方案很多,开源框架也不少,但具体的实现还是得看需求.下面分享一下我在项目中用到的两种拉图片方案. 1. 少量图片 如果图片少量,使用框架就显得冗余,直接下载就更简洁一些. public static boolean downloadImage(String url, String savePath) { LogUtil.v(TAG, "url=" + url + "; savep

Apollo 3 定时/长轮询拉取配置的设计

前言 如上图所示,Apollo portal 更新配置后,进行轮询的客户端获取更新通知,然后再调用接口获取最新配置.不仅仅只有轮询,还有定时更新(默认 5 分钟一次).目的就是让客户端能够稳定的获取到最新的配置. 一起来看看他的设计. 核心代码 具体的类是 RemoteConfigRepository,每一个 Config -- 也就是 namespace 都有一个 RemoteConfigRepository 对象,表示这个 Config 的远程配置仓库,可以利用这个仓库请求远程服务,得到配置

nova虚拟机启动拉取image的过程

这里只关注Nova virt的spawn函数,glance.nova后端为ceph nova/virt/libvirt/driver.py    def spawn(self, context, instance, image_meta, injected_files,               admin_password, network_info=None, block_device_info=None):         image_meta = objects.ImageMeta.f

拉取种子用户的4种方法

本文和大家分享的主要是网络运营中拉取种子用户的几种方法,一起来看看吧,希望对运营新人有所帮助吧. 比较新的产品,我觉得没有用户来,这种情况怎么处理? 第一步要先了解你的目标用户是谁?很多同学并没有想清楚自己真正想要拉哪些人,他们是谁?到底在哪里?所以我列了三个问题,希望大家真正做用户运营工作之前,先考虑好这三个问题,该怎么去回答. 第1个问题,他们到底是谁? · 第2个问题,谁对这些人有影响力? · 第3个问题,有影响力的这些人又在哪里? 第一个问题:他们是谁,指的是你要先思考你的目标用户是谁?

删除本地分支,并重新拉取远程分支复制到本地

1. 删除本地分支 假设想要删除本地的分支temp,并且当前处在temp分支上,首先需要切换到别的分支(假设切换到develop分支): git checkout develop //切换到develop分支 git branch //此时处在develop分支 git branch -D temp //删除本地temp分支 2.重新拉取远程仓库代码,并自动创建分支 git fetch 会将远程代码的更新(commit)拉取到本地. git fetch origin temp:temp //拉取