如何保证消息不被重复消费?(如何保证消息消费时的幂等性)

首先就是比如rabbitmq、rocketmq、kafka,都有可能会出现消费重复消费的问题,正常。因为这问题通常不是mq自己保证的,是给你保证的。然后我们挑一个kafka来举个例子,说说怎么重复消费吧。

kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consumer消费了数据之后,每隔一段时间,会把自己消费过的消息的offset提交一下,代表我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧。

但是凡事总有意外,比如我们之前生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接kill进程了,再重启。这会导致consumer有些消息处理了,但是没来得及提交offset,尴尬了。重启之后,少数消息会再次消费一次。

其实重复消费不可怕,可怕的是你没考虑到重复消费之后,怎么保证幂等性。

给你举个例子吧。假设你有个系统,消费一条往数据库里插入一条,要是你一个消息重复两次,你不就插入了两条,这数据不就错了?但是你要是消费到第二次的时候,自己判断一下已经消费过了,直接扔了,不就保留了一条数据?

一条数据重复出现两次,数据库里就只有一条数据,这就保证了系统的幂等性

幂等性,我通俗点说,就一个数据,或者一个请求,给你重复来多次,你得确保对应的数据是不会改变的,不能出错。

那所以第二个问题来了,怎么保证消息队列消费的幂等性?

其实还是得结合业务来思考,我这里给几个思路:

(1)比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update一下好吧

(2)比如你是写redis,那没问题了,反正每次都是set,天然幂等性

(3)比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的id,类似订单id之类的东西,然后你这里消费到了之后,先根据这个id去比如redis里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个id写redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。

还有比如基于数据库的唯一键来保证重复数据不会重复插入多条,我们之前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,因为kafka消费者还没来得及提交offset,重复数据拿到了以后我们插入的时候,因为有唯一键约束了,所以重复数据只会插入报错,不会导致数据库中出现脏数据

原文地址:https://www.cnblogs.com/qingmuchuanqi48/p/11124068.html

时间: 2024-08-29 13:51:19

如何保证消息不被重复消费?(如何保证消息消费时的幂等性)的相关文章

Kafka在高并发的情况下,如何避免消息丢失和消息重复?kafka消费怎么保证数据消费一次?数据的一致性和统一性?数据的完整性?

1.kafka在高并发的情况下,如何避免消息丢失和消息重复? 消息丢失解决方案: 首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功 消息重复解决方案: 消息可以使用唯一id标识 生产者(ack=all 代表至少成功发送一次) 消费者 (offset手动提交,业务逻辑成功处理后,提交offset) 落表(主键或者唯一索引的方式,避免重复数据) 业务逻辑处理(选择唯一主键存储到R

RocketMQ(消息重发、重复消费、事务、消息模式)

RocketMQ基础:https://github.com/apache/rocketmq/tree/rocketmq-all-4.5.1/docs/cn 分布式消息系统作为实现分布式系统可扩展.可伸缩性的关键组件,需要具有高吞吐量.高可用等特点.而谈到消息系统的设计,就回避不了两个问题: 消息的顺序问题 消息的重复问题 RocketMQ作为阿里开源的一款高性能.高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的? 关键特性以及其实现原理 一.

消息队列如何处理重复消息

一.消息重复现象 在 MQTT 协议中,给出了三种传递消息时能够提供的服务质量标准: At most once:最多一次,这种情况会丢失部分数据,一般日志收集这种对数据不严格的可以使用 At least once:最少一次,这种会导致一条消息重复发送 Exactly once:正好一次,一条消息只会被消费一次 RocketMQ,Rabbit MQ,Kafka都是使用的At least once,虽然消息会重复,但不会丢失.不使用Exactly once这种呢,是因为这种每次发送前发送都要检查这条

Kafka创建&查看topic,生产&消费指定topic消息

启动zookeeper和Kafka之后,进入kafka目录(安装/启动kafka参考前面一章:https://www.cnblogs.com/cici20166/p/9425613.html) 1.创建Topic 1)运行命令: ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test1 2181 是zookeeper 端口 图示为创建成

系统学习消息队列分享(六) 如何确保消息不会丢失?

对于刚刚接触消息队列的同学,最常遇到的问题,也是最头痛的问题就是丢消息了.对于大部分业务系统来说,丢消息意味着数据丢失,是完全无法接受的. 其实,现在主流的消息队列产品都提供了非常完善的消息可靠性保证机制,完全可以做到在消息传递过程中,即使发生网络中断或者硬件故障,也能确保消息的可靠传递,不丢消息. 绝大部分丢消息的原因都是由于开发者不熟悉消息队列,没有正确使用和配置消息队列导致的.虽然不同的消息队列提供的 API 不一样,相关的配置项也不同,但是在保证消息可靠传递这块儿,它们的实现原理是一样的

消息队列入门(一)关于消息队列

什么是消息队列 消息是指在两个独立的系统间传递的数据,这两个系统可以是两台计算机,也可以是两个进程. 消息可以非常简单,可以是简单的字符串,也可以是保存了数据持久化的各种类型的文档集合. 队列是在消息的传输过程中的通道,是保存消息的容器,根据不同的情形,可以有先进先出,优先级队列等区别 . 为什么使用消息队列 个人觉得消息队列主要的意义是解耦和异步处理,以及在高并发场景下平滑短时间内大量的服务请求. 消息队列不仅被用于系统内部组件之间的通信,同时也被用于系统跟其它服务之间的交互. 消息队列的使用

ZeroMQ接口函数之 :zmq_msg_copy - 把一个消息的内容复制到另一个消息中

ZeroMQ 官方地址 :http://api.zeromq.org/4-1:zmq_msg_copy zmq_msg_copy(3)   ØMQ Manual - ØMQ/3.2.5 Name zmq_msg_copy - 把一个消息的内容复制到另一个消息中 Synopsis int zmq_msg_copy (zmq_msg_t *dest, zmq_msg_t *src); Description zmq_msg_copy()函数会将src指定的消息对象中的内容复制到dest指定的消息对象

MFC 消息映射表和虚函数实现消息映射到底谁的效率高

深入浅出MFC对于虚函数实现方式的缺点,它指出:虚函数耗费大量内存,系统最终将被这些额外负担拖垮.    但是现在对于容量巨大的白菜价格的内存来说,这种额外负担是否已经过时了呢~?    书中提到,虚函数表中的每一个项目都是一个函数指针,价值4字节,如果基类的虚函数表有100项 (MFC里面的消息数量是否在这个数量级?),经过十层继承,开支散叶,总共需要耗费多少内存?    我粗略地算了下,不知道这种计算方法是否正确,4Byte*100项=400Byte.如果CCmdTarget中定义100个消

消息队列技术终结者(四)—消息消费者以何种方式接收消息

消息消费者可以同步接收消息,也可以异步接收消息,一般而言,采用异步的方式接受消息优于采用同步的方式接受消息,体现在:        1.异步方式创建的网络流量比较小,单向推送消息并使之通过管道进入消息监听器.管道操作支持将多条消息聚合为一个网络调用.        2.异步方式使用线程比较少.异步方式在消息接收者不活动期间不使用线程.同步方式的消息接收者在接收调用期间内使用线程,结果线程可能会长时间保持空闲,尤其是该调用中指定了阻塞超时.        3.使用异步方式可以规避阻塞服务器上运行的