LMAX Disrutpor—多生产者多消费者中,消息复制分发的高性能实现

解决的问题

当我们有多个消息的生产者线程,一个消费者线程时,他们之间如何进行高并发、线程安全的协调?

很简单,用一个队列。

当我们有多个消息的生产者线程,多个消费者线程,并且每一条消息需要被所有的消费者都消费一次(这就不是一般队列,只消费一次的语义了),该怎么做?

这时仍然需要一个队列。但是:

1. 每个消费者需要自己维护一个指针,知道自己消费了队列中多少数据。这样同一条消息,可以被多个人独立消费。

2. 队列需要一个全局指针,指向最后一条被所有生产者加入的消息。消费者在消费数据时,不能消费到这个全局指针之后的位置——因为这个全局指针,已经是代表队列中最后一条可以被消费的消息了。

3. 需要协调所有消费者,在消费完所有队列中的消息后,阻塞等待。

4. 如果消费者之间有依赖关系,即对同一条消息的消费顺序,在业务上有固定的要求,那么还需要处理谁先消费,谁后消费同一条消息的问题。

总而言之,如果有多个生产者,多个消费者,并且同一条消息要给到所有的消费者都去处理一下,需要做到以上4点。这是不容易的。

LMAX Disruptor,正是这种场景下,满足以上4点要求的单机跨线程消息传递、分发的开源、高性能实现。

这里有一篇英文的Disruptor介绍好文:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction

关键概念

1. RingBuffer

应用需要传递的消息在Disrutpor中称为Event(事件)。

RingBuffer是Event的数组,实现了阻塞队列的语义:

如果RingBuffer满了,则生产者会阻塞等待。

如果RingBuffer空了,则消费者会阻塞等待。

2. Sequence

在上文中,我提到“每个消费者需要自己维护一个指针”。这里的指针就是一个单调递增长整数(及其基于CAS的加法、获取操作),称为Sequence。

除了每个消费这需要维护一个指针外,RingBuffer自身也要维护一个全局指针(如上一节第2点所提到的),记录最后一条可以被消费的消息。这个全局指针就在下图红框中。

生产场景实现

生产者往RingBuffer中发送一条消息(RingBuffer.publish())时:

1. 生产者的私有sequence会+1

2. 检查生产者的私有sequence与RingBuffer中Event个数的关系。如果发现Event数组满了(下图红框中的判断),则阻塞(下图绿框中的等待)。

3. RingBuffer会在Event数组中(sequencer+1) % BUFFER_SIZE的地方,放入Event。这里的取模操作,就体现了Event数组用到最后,则回到头部继续放,所谓“Ring“ Buffer的轮循复用语义。

消费场景实现

消费者从RingBuffer循环队列中获取一条消息时:

1. 从消费者私有Sequence,可以知道它自己消费到了RingBuffer队列中的哪一条消息。

2. 从RingBuffer的全局指针Sequence,可以知道RingBuffer中最后一条没有被消费的消息在什么位置。

3. N = (RuingBuffer的全局指针Sequence - 消费者私有Sequence),就是当前消费者,还可以消费多少Event。

4. 如果以上差值N为0,说明当前消费者已经消费过RingBuffer中的所有消息了。那么当前消费者会阻塞。等待生产者加入更多的消息:

以上代码中,红框中的availableSequence就是RingBuffer的全局指针Sequence。绿框中的sequence是当前消费者的私有sequence。

如果这个判断为true,说明RingBuffer中最新一条可以被消费的Event,已经被当前消费者消费过了。那么就会调用apployWaitMethod()阻塞,等待生产者产生更多的Event。

5. 如果RingBuffer中,还有可以被当前消费者消费的Event,即N > 0,

那么消费者,会一口气获取所有可以被消费的N个Event。即下图中的while循环,直到N个Event都被消费才退出。这种一口气消费尽量多的Event,是高性能的体现。

从RingBuffer中每获取一个Event,都会回调绿框中的eventHandler——这是应用注册的Event处理方法,执行应用的Event消费业务逻辑。

  

  最后,上图中的sequence.set(availableSequence),会把当前消费者的私有Sequence更新到RingBuffer的全局Sequence。表示RingBuffer中所有的Event都已经消费掉了。

高性能的实现细节

无锁

无锁就没有锁竞争。当生产者、消费者线程数很高时,意义重大。所以,

往大里说,每个消费者维护自己的Sequence,基本没有跨线程共享的状态。

往小里说,Sequence的加法是CAS实现的。

  • 当生产者需要判断RingBuffer是否已满时,用CAS比较原先RingBuffer的Event个数,和假定放入新Event后Event的个数。
  • 如果CAS返回false,说明在判断期间,别的生产者加入了新Event;或者别的消费者拿走了Event。那么当前判断无效,需要重新判断。这就是常见的 do { ... } while (false == CAS(oldVal, newVal))。——都是套路:)

对象的复用

JVM运行时,一怕创建大对象,二怕创建很多小对象。这都会导致JVM堆碎片化、对象元数据存储的额外开销大。这是高性能Java应用的噩梦。

为了解决第二点“很多小对象”,主流开源框架都会自己维护、复用对象池。LMAX Disruptor也不例外。

生产者不是创建新的Event对象,放入到RingBuffer中。而是从RingBuffer中取出一个已有的Event对象,更新它所指向的业务数据,来代表一个逻辑上的新Event。

所以LMAX Disruptor的生产者API,用起来有些麻烦——分为三步,一是下图绿框中取出一个已有的、已经被所有人消费过的Event对象,二是下图红框中更新这个Event对象所指向的业务数据,三是下图蓝框中标记这个Event对象为逻辑上的新Event。

总结

https://github.com/LMAX-Exchange/disruptor/wiki/Introduction 这篇文章对Disruptor基本概念已经介绍得很清楚了。

但是,我觉得,入门介绍结合源码去咀嚼,才会比较sexy,朋友们会深入理解。其实也不难,关键是找出源码中的核心部分。

篇幅所限,本文对于Disruptor的高级功能没有解释,比如处理多个消费者之间的依赖关系。有机会补充。

时间: 2024-08-28 10:10:57

LMAX Disrutpor—多生产者多消费者中,消息复制分发的高性能实现的相关文章

RabbitMQ基础概念详解(一)——环境配置及模拟生产者和消费者简单消息发送

一.简介: RabbitMq 是实现了高级消息队列协议(AMQP)的开源消息代理中间件.消息队列是一种应用程序对应用程序的通行方式,应用程序通过写消息,将消息传递于队列,由另一应用程序读取 完成通信.而作为中间件的 RabbitMq 无疑是目前最流行的消息队列之一. AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计.消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然.

微服务实战(四):落地微服务架构到直销系统(将生产者与消费者接入消息总线)

前一篇文章我们已经完成了基于RabbitMq实现的的消息总线,这篇文章就来看看生产者(订单微服务)与消费者(经销商微服务)如何接入消息总线实现消息的发送与消息的接收处理. 定义需要发送的消息: 下单消息要被发送到消息总线,并被经销商微服务的处理器处理.经销商微服务处理时,需要知道要对哪个经销商处理多少的PV值与电子币余额.这些信息就是事件消息需要承载的重要信息. public class OrderCreatedProcessDealerEvent:BaseEvent { public deci

Disruptor多个消费者不重复处理生产者发送过来的消息

1.定义事件事件(Event)就是通过 Disruptor 进行交换的数据类型. package com.ljq.disruptor; import java.io.Serializable; /** * 定义事件数据,本质是个普通JavaBean * * @author jqlin */ @SuppressWarnings("serial") public class LongEvent implements Serializable { private long value; pu

在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

什么是生产者消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力. 什么是生产者消费者模式 生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题.生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待

Kafka-常用术语(消息、生产者、消费者、集群、broker解释)

Kafka-常用术语(消息.生产者.消费者.集群.broker解释) 消息和批次 kafka的数据单元被称为消息.类似于数据库表中的一行数据. 消息由字节数组组成,所以对于kafka来说,消息里的数据没有特别的格式或含义. 消息可以有一个可选的元数据,也就是键,键也是一个字节数组,当消息以一种可控的方式写入不同的分区时,会用到键.最简单的例子就是为键生成一个一致性散列值,然后使用散列值对主题分区数进行取模,为消息选取分区.这样可以保证具有相同键的消息总是被写到相同的分区上. 为了提高效率,消息被

使用JUC并发工具包的Lock和Condition,实现生产者和消费者问题中的有界缓存

JDK5.0之前,用java实现生产者和消费者的唯一方式就是使用synchronized内置锁和wait/notify条件通知机制.JDK5.0之后提供了显示锁Lock和条件队列Condition,与内置锁和内置条件队列相对应,但是显示的锁和条件队列,功能更强大,更灵活.此外JDK5.0之后还提供了大量很有用的并发工具类,如BlockingQueue等,基于这些数据结构,能够方便.快速.高效的构建自己应用需要的效果.这里我们简单使用下显示锁和条件队列,来模拟有界缓存的实现,功能类似于JDK内置的

Java中的生产者、消费者问题

Java中的生产者.消费者问题描述: 生产者-消费者(producer-consumer)问题, 也称作有界缓冲区(bounded-buffer)问题, 两个进程共享一个公共的固定大小的缓冲区(仓库). 其中一个是生产者, 用于将产品放入仓库: 另外一个是消费者, 用于从仓库中取出产品消费. 问题出现在当仓库已经满了, 而此时生产者还想向其中放入一个新的产品的情形, 其解决方法是让生产者此时进行等待, 等待消费者从仓库中取走了一个或者多个产品后再去唤醒它. 同样地, 当仓库已经空了, 而消费者还

多线程生产者、消费者模式中,如何停止消费者?多生产者情况下对“毒丸”策略的应用。

生产者.消费者模式是多线程中的经典问题.通过中间的缓冲队列,使得生产者和消费者的速度可以相互调节. 对于比较常见的单生产者.多消费者的情况,主要有以下两种策略: 通过volatile boolean producerDone =false 来标示是否完成.生产者结束后标示为true, 消费者轮询这个变量来决定自己是否退出. 这种方式对producerDone产生比较大的争用,实现起来也有诸多问题需要考虑. 比较经典的"毒丸"策略,生产者结束后,把一个特别的对象:"毒丸&quo

linux中的线程同步:生产者、消费者问题

#include <stdio.h> #include <semaphore.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #define BUFFER_COUNT 5 int Buffer[BUFFER_COUNT]; //指针数组 int front = 0; int tail = 0; sem_t SemProd; sem_t SemCon; void* pr