分布式场景下如何保证消息队列实现最终一致性

考虑一个分布式场景中一个常见的场景:服务A执行某个数据库操作成功后,会发送一条消息到消息队列,现在希望只有数据库操作执行成功才发送这条消息。下面是一些常见的作法:

1. 先执行数据库操作,再发送消息

public void purchaseOrder() {
    orderDao.save(order);
    messageQueue.send(message);
}

有可能order新增成功,发送消息失败。最终形成不一致状态。

2. 先发送消息,再执行数据库操作

public void purchaseOrder() {
    messageQueue.send(message);
    orderDao.save(order);
}

有可能消息发送成功,而order新增失败,从而形成不一致状态。

3. 在数据库事务中,先发送消息,再执行数据库操作

@Transactional public void purchaseOrder() {
    messageQueue.send(message);
    orderDao.save(order);
}

这里同样无法保证一致性。如果数据库操作成功,然而消息已经发送了,无法进行回滚。

4. 在数据库事务中,先执行数据库操作,再发送消息

@Transactional public void purchaseOrder() {
    orderDao.save(order);
    messageQueue.send(message);
}

这种方案成功与否,取决于消息队列是否拥有应答机制和事务机制。

应答机制表示producer发送消息后,消息队列能够返回response从而证明消息是否插入成功。

如果消息队列拥有应答机制,将上面的代码改写为:

@Transactional public void purchaseOrder() {
    orderDao.save(order); try{
        kafkaProducer.send(message).get();
    } catch(Exception e) throw new RuntimeException("Fail to send message");
    }

这段代码表示如果发送发收到消息队列错误的response,就抛出一个RuntimeException。那么消息发送失败,能够造成数据库操作的回滚。这个方案看似可行,然而存在这样一种情况,如果消息发送成功,而消息队列由于网络原因没有即时返回response,此时消息发送方由于没有及时收到应答从而认为消息发送失败了,因此消息发送方的数据库事务回滚了,然而消息的确已经插入成功,从而造成了最终不一致性。

上面的不一致性可以通过消息的事务机制解决。

事务机制表示消息队列中的消息是否拥有状态,从而决定消费者是否消费该条消息。

Alibaba旗下的开源消息队列RocketMQ以高可用性闻名,它是最早支持事务消息的消息队列。Kafka从版本0.11开始也支持了事务机制。

RoketMQ的事务机制是将消息标记为Prepared状态或者Confirmed状态。处于Prepared状态的消息对consumer不可见。

而Kafka通过Transaction Marker将消息标记为Uncommited或Commited状态。Consumer通过配置isolation-levelread_committedread_uncommitted来决定对哪种类型的消息可见。

5. 消息队列不支持事务消息

如果消息队列不支持事务消息,那么我们的解决方案是,新增一张message表,并开启一个定时任务扫描这张message表,将所有状态为prepared的message发送给消息队列,发送成功后,将message状态置为confirmed。

代码如下:

@Transactional public void purchaseOrder() {
    orderDao.save(order);
    messageService.save(message);
}

此时插入order和插入message的逻辑处于同一个数据库事务,通过后台的定时程序不断扫描message表,因此一定能够保证消息被成功投递到消息消费方。

这个方案存在的一个问题是,有可能后台任务发送消息成功后宕机了,从而没有来得及将已发送的message状态置为confirmed。因此下一次扫描message表时,会重复发送该条消息。这就是at least once delivery

由于at least once delivery的特性,consumer有可能收到重复的数据。此时可以在consumer端建立一张message_consume表,来判断消息是否已经消费过,如果已经消费过,那么就直接丢弃该消息。



本文的重点是你有没有收获与成长,其余的都不重要,希望读者们能谨记这一点。同时我经过多年的收藏目前也算收集到了一套完整的学习资料,包括但不限于:分布式架构、高可扩展、高性能、高并发、Jvm性能调优、Spring,MyBatis,Nginx源码分析,Redis,ActiveMQ、、Mycat、Netty、Kafka、Mysql、Zookeeper、Tomcat、Docker、Dubbo、Nginx等多个知识点高级进阶干货,希望对想成为架构师的朋友有一定的参考和帮助

需要更详细架构师技能思维导图和以下资料的可以加一下技术交流分享群:“708 701 457”免费获取


原文地址:https://blog.51cto.com/14230003/2408803

时间: 2024-08-05 11:03:54

分布式场景下如何保证消息队列实现最终一致性的相关文章

分布式场景下Kafka消息顺序性的思考

如果业务中,对于kafka发送消息异步消费的场景,在业务上需要实现在消费时实现顺序消费, 利用kafka在partition内消息有序的特点,消息消费时的有序性. 1.在发送消息时,通过指定partition hash 2.consumer 消费消息时,需要使用亲缘性线程池进行消费,才能实现消息的基本有序.否则即使通过发送时指定partition,在消费端由于线程池的异步消费,消息之间的处理都是并发进行的,消息就会被打乱. 上面的方式基本可以实现消息的消费顺序性,除了在极端场景下,比如: 1.进

还没弄懂分布式场景下数据一致性问题?一文教你轻松解决!

文章纲要 此次分享的缘由 目前分布式事务问题是怎么解决的 行业中有什么解决方案 这些解决方案分别有什么优缺点 别人是怎么做的 我们可以怎么来做 此次分享的缘由 支付重构 考虑支付重构的时候,自然想到原本属于一个本地事务中的处理,现在要跨应用了要怎么处理.拿充值订单举个栗子吧,假设:原本订单模块和账户模块是放在一起的,现在需要做服务拆分,拆分成订单服务,账户服务.原本收到充值回调后,可以将修改订单状态和增加金币放在一个mysql事务中完成的,但是呢,因为服务拆分了,就面临着需要协调2个服务才能完成

【转】MySQL乐观锁在分布式场景下的实践

背景 在电商购物的场景下,当我们点击购物时,后端服务就会对相应的商品进行减库存操作.在单实例部署的情况,我们可以简单地使用JVM提供的锁机制对减库存操作进行加锁,防止多个用户同时点击购买后导致的库存不一致问题. 但在实践中,为了提高系统的可用性,我们一般都会进行多实例部署.而不同实例有各自的JVM,被负载均衡到不同实例上的用户请求不能通过JVM的锁机制实现互斥. 因此,为了保证在分布式场景下的数据一致性,我们一般有两种实践方式:一.使用MySQL乐观锁:二.使用分布式锁. 本文主要介绍MySQL

关于MQ的几件小事(二)如何保证消息队列的高可用

1.RabbitMQ的高可用 RabbitMQ基于主从模式实现高可用.RabbitMQ有三种模式:单机模式,普通集群模式,镜像集群模式. (1)单机模式: 单机模式就是demo级别的,生产中不会有人使用. (2)普通集群模式 普通集群模式就是在多台机器上启动多个rabbitmq实例,每个机器启动一个.但是创建的queue只会放在一个rabbitmq实例上面,但是其他的实例都同步了这个queue的元数据.在你消费的时候,如果连接到了另一个实例,他会从拥有queue的那个实例获取消息然后再返回给你.

消费端如何保证消息队列MQ的有序消费

消息无序产生的原因 消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(Producer),但是往往在生产环境中有多个消息的消费端(Consumer),尽管消费端在拉取消息时是有序的,但各个消息由于网络等方面原因无法保证在各个消费端中处理时有序. 场景分析 先后两次修改了商品信息,消息A和消息B先后同步写入MySQL,接着异步写入消息队列中发送消息,此时消息队列生产端(Producer)按时序先后发出了A和B两条消息(消息A先发出,消息B后发出)

阿里Java面试题剖析:在高并发的情况下如何保证消息的顺序性?

面试原题 如何保证消息的顺序性? 面试官心理分析 其实这个也是用 MQ 的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题. 面试题剖析 我举个例子,我们以前做过一个 mysql binlog 同步的系统,压力还是非常大的,日同步数据要达到上亿,就是说数据从一个 mysql 库原封不动地同步到另一个 mysql 库里面去(mysql -> mysql).常见的一点在于说比如大数据 team,就需要同步一个 mysql 库过来,对公司

Linux下进程间通信之消息队列

消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法.每个数据块都被认为是一个有类型,接受者进程接受的数据块可以有不同的类型值.我们可以通过发送消息来避免命名管道的同步和阻塞问题.消息队列是基于消息的,且消息队列的读取不一定是先入先出.每个消息的最大长度是有上限的,每个消息队列总的字节数是有上限的,系统上消息队列的总数也有一个上限. 具体步骤如下: 1):创建新消息队列或取得已存在消息队列 2):向队列读.写消息 3):设置消息队列属性 comm.h comm.c sever.c cli

linux下,采用消息队列实现进程通信 (待续)

消息队列 是进程间通信的一种方式,它不像管道那样是基于字节流,而是基于消息. 也就是说,消息队列不一定是先进先出了. 这次我用消息队列实现一个简单的进程间通信程序,让两个进程server和client进行通信 (由于要准备学校的期末考试,先贴代码和运行结果,原理分析之后补上) 程序结构: Makefile: comm.h & comm.c: msg_server.h & msg_client.h: msg_server.c: msg_client.c 运行结果: 先启动服务器: (错误13

架构师成长之路:如何保证消息队列的高可用

问题一:描述一下 JVM 的内存区域 程序计数?(PC,Program Counter Register).在 JVM 规范中,每个线程都有它自己的程序计数?,并且任何时间一个线程都只有一个方法在执行,也就是所谓的当前方法.程序计数?会存储当前线程正在执行的 Java 方法的 JVM 指令地址:或者,如果是在执行本地方法,则是未指定值(undefined). Java 虚拟机栈(Java Virtual Machine Stack),早期也叫 Java 栈.每个线程在创建时都会创建一个虚拟机栈,