如何维护消息消费的幂等性

幂等性:一个请求,不管重复来多少次,结果是不会改变的。

每个消息都会有唯一的消息 id。
1)、先查再保存
每次保存数据的时候,都先查一下,如果数据存在了那么就不保存。这个情况是并发不高的情况。

2)、业务表添加约束条件
如果你的数据库将来都不会分库分表,那么可以在业务表字段加上唯一约束条件(UNIQUE),这样相同的数据就不会保存为多份。

3)、添加消息表
再数据库里面,添加一张消息消费记录表,表字段加上唯一约束条件(UNIQUE),消费完之后就往表里插入一条数据。因为加了唯一约束条件,第二次保存的时候,MySQL 就会报错,就插入不进去;通过数据库可以限制重复消费。

4)、使用 Redis
如果你的系统是分布式的,又做了分库分表,那么可以使用 Redis 来做记录,把消息 id 存在 Redis 里,下次再有重复消息 id 在消费的时候,如果发现 Redis 里面有了就不能进行消费。

5)、高并发下
如果你的系统并发很高,那么可以使用 Redis 或者 zookeeper 的分布式对消息 id 加锁,然后使用上面的几个方法进行幂等性控制

原文地址:https://www.cnblogs.com/zk-blog/p/12364844.html

时间: 2024-11-03 09:18:52

如何维护消息消费的幂等性的相关文章

如何保证消息不被重复消费?(如何保证消息消费时的幂等性)

首先就是比如rabbitmq.rocketmq.kafka,都有可能会出现消费重复消费的问题,正常.因为这问题通常不是mq自己保证的,是给你保证的.然后我们挑一个kafka来举个例子,说说怎么重复消费吧. kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧. 但是凡事总

关于RocketMQ消息消费与重平衡的一些问题探讨

其实最好的学习方式就是互相交流,最近也有跟网友讨论了一些关于 RocketMQ 消息拉取与重平衡的问题,我姑且在这里写下我的一些总结. 关于 push 模式下的消息循环拉取问题 之前发表了一篇关于重平衡的文章:「Kafka 重平衡机制」,里面有说到 RocketMQ 重平衡机制是每隔 20s 从任意一个 Broker 节点获取消费组的消费 ID 以及订阅信息,再根据这些订阅信息进行分配,然后将分配到的信息封装成 pullRequest 对象 pull 到 pullRequestQueue 队列中

RocketMQ源码解析-消息消费

RocketMQ源码解析-消息消费 1.消费者相关类 2.消费者的启动 3.消息的拉取 4.消费者的负载均衡 5.消息的消费 6.消费进度管理 看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地方请多多指教 RocketMQ中消息的消费分为2种方式,一种是pull模式,一种为push模式(基于pull模式实现),大部分的业务场合下业界用的比较多的是push模式,一句话你没有特殊需求就用push,push模式可以达到准实时的消息推送 那什么时候可以用

消息消费队列和索引文件的更新

ConsumeQueue,IndexFile需要及时更新,否则无法及时被消费,根据消息属性查找消息也会出现较大延迟. mq通过开启一个线程ReputMessageService来准时转发commitLog文件更新事件,相应的任务处理器根据转发的消息及时更新ConsumeQueue,IndexFile文件 DefaultMessageStore#start ReputMessageService线程每执行一次任务推送休息1毫秒旧继续尝试推送消息到消息消费队列和索引文件. 返回reputFromOf

mq消息消费

消息消费以组的的模式开展: 一个消费组内可以包含多个消费者,每一个消费组可订阅多个主题: 消费组之间有集群模式与广播模式两种消费模式:集群模式-主题下的同一条消息只允许被其中一个消费者消费.广播模式-主题下的同一条消息将被集群内的所有消费者消费一次.集群模式下消息队列负载机制遵循一个通用的思想:一个消息队列同一时间只允许被一个消息消费者消费,一个消费者可以消费多个消息队列. 消息服务器与消费者之间的消息传送也有两种方式 推模式,拉模式:拉模式-消费端主动发起啦消息请求.推模式-消息到达服务器后,

MQ解决消息重发--做到幂等性

一.MQ消息发送 1.发送端MQ-client(消息生产者:Producer)将消息发送给MQ-server: 2.MQ-server将消息落地: 3.MQ-server回ACK给MQ-client(Producer): 4.MQ-server将消息发送给消息接受端MQ-client(消息消费者:Customer): 5.MQ-client(Customer)消费接受到消息后发送ACK给MQ-server: 6.MQ-server将落地消息删除 二.消息重复发送原因 为了保证消息必达,MQ使用了

基于loghub的消息消费延迟监控

我们可以把loghub当作一个消息中间件来使用.如果能知道当前的消费进度,自然好了,否则消费情况一无所知,总是有点慌! loghub消费分两种情况,一是普通消费,二是消费组消费: 消费组消费,loghub服务端会记录消费情况,这时可以通过调用服务端API进行偏移信息查询. 普通消费则不同,需要自行维护偏移量,即只有自己知道偏移信息,自己处理延迟.我们主要讨论这种情况. 一. 消费loghub数据的样例如下: // 普通消费 private static void consumeDataFromS

ActiveMQ(4)---ActiveMQ原理分析之消息消费

消费端消费消息的原理 我们通过上一节课的讲解,知道有两种方法可以接收消息,一种是使用同步阻塞的MessageConsumer#receive方法.另一种是使用消息监听器MessageListener.这里需要注意的是,在同一个session下,这两者不能同时工作,也就是说不能针对不同消息采用不同的接收方式.否则会抛出异常.至于为什么这么做,最大的原因还是在事务性会话中,两种消费模式的事务不好管控 消费端消费消息源码分析 ActiveMQMessageConsumer.receive消费端同步接收

记一次生产kafka消息消费的事故

事故背景: 我们公司与合作方公司有个消息同步的需求,合作方是消息生产者,我们是消息消费者,他们通过kafka给我们推送消息,我们实时接收,然后进行后续业务处理.昨天上午,发现他们推送过来的广场门店信息我们都没有消费,导致我们系统和他们系统数据不一致,从而导致无法提单,无法出报表(报表有误)等各种问题 排查过程: (1)因为coco身体不适,上午请假去医院了,所以这个问题就转给我们team的专门运维的同事了,电话大概给他说明了代码路径,可惜,半天下来仍然无果,看着微信群里他发的消息,我有点抓狂,根