Disrunptor多生产者多消费者模型讲解

多生产者多消费者模拟需求:
1、创建100个订单生产者,每个生产者生产100条订单,总共会生产10000条订单,由3个消费者进行订单消费处理。
2、100个订单生产者全部创建完毕,再一起生产消费订单数据

1、定义事件

package com.ljq.multi;

public class Order {
	private String id;// ID
	private String name;
	private double price;// 金额

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public double getPrice() {
		return price;
	}

	public void setPrice(double price) {
		this.price = price;
	}

}

  

2、生产者

package com.ljq.multi;

import com.lmax.disruptor.RingBuffer;

/**
 * 生产者
 *
 * @author Administrator
 *
 */
public class Producer {
	private final RingBuffer<Order> ringBuffer;

	public Producer(RingBuffer<Order> ringBuffer) {
		this.ringBuffer = ringBuffer;
	}

	/**
	 * onData用来发布事件,每调用一次就发布一次事件 它的参数会用过事件传递给消费者
	 */
	public void onData(String data) {
		// 可以把ringBuffer看做一个事件队列,那么next就是得到下面一个事件槽
		long sequence = ringBuffer.next();
		try {
			// 用上面的索引取出一个空的事件用于填充(获取该序号对应的事件对象)
			Order order = ringBuffer.get(sequence);
			// 获取要通过事件传递的业务数据
			order.setId(data);
		} finally {
			// 发布事件
			// 注意,最后的 ringBuffer.publish 方法必须包含在 finally 中以确保必须得到调用;如果某个请求的
			// sequence 未被提交,将会堵塞后续的发布操作或者其它的 producer。
			ringBuffer.publish(sequence);
		}
	}

}

  

3、消费者

package com.ljq.multi;

import java.util.concurrent.atomic.AtomicInteger;

import com.lmax.disruptor.WorkHandler;

public class Consumer implements WorkHandler<Order>{

	private String consumerId;

	private static AtomicInteger count = new AtomicInteger(0);

	public Consumer(String consumerId){
		this.consumerId = consumerId;
	}

	@Override
	public void onEvent(Order order) throws Exception {
		count.incrementAndGet();
		System.out.println("当前消费者: " + this.consumerId + ",消费信息:" + order.getId() + ",count:" + getCount());
	}

	public int getCount(){
		return count.get();
	}

}

  

4、生产者消费者启动类

package com.ljq.multi;

import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkerPool;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.ProducerType;

/**
 * 	disrunptor 多生产者多消费者
 *
 * @author Administrator
 *
 */
public class Main {

	public static void main(String[] args) throws Exception {

		//创建ringBuffer
		RingBuffer<Order> ringBuffer =
				RingBuffer.create(ProducerType.MULTI,
						new EventFactory<Order>() {
				            @Override
				            public Order newInstance() {
				                return new Order();
				            }
				        },
				        1024 * 1024,
						new YieldingWaitStrategy());

		SequenceBarrier barriers = ringBuffer.newBarrier();

		//创建3个消费者消费订单数据
		Consumer[] consumers = new Consumer[3];
		for(int i = 0; i < consumers.length; i++){
			consumers[i] = new Consumer("c" + i);
		}

		WorkerPool<Order> workerPool =
				new WorkerPool<Order>(ringBuffer,
						barriers,
						new IntEventExceptionHandler(),
						consumers);

        ringBuffer.addGatingSequences(workerPool.getWorkerSequences());
        workerPool.start(Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));  

        final AtomicInteger count = new AtomicInteger(0);
        final CountDownLatch latch = new CountDownLatch(1);
        //100个生产者
        for (int i = 0; i < 100; i++) {
        	final Producer p = new Producer(ringBuffer);
        	new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						System.out.println(count.incrementAndGet());
						latch.await(); //等待,直到100个生产者创建完成,再一起生产消费订单数据

					} catch (InterruptedException e) {
						e.printStackTrace();
					}

					//每个生产者生产100条订单
					for(int j = 0; j < 100; j ++){
						p.onData(UUID.randomUUID().toString());
					}
				}
			}).start();
        }
        System.out.println("---------------开始生产-----------------");

        latch.countDown(); //生产者全部创建完毕,开始生产订单

	}

	static class IntEventExceptionHandler implements ExceptionHandler {
	    public void handleEventException(Throwable ex, long sequence, Object event) {}
	    public void handleOnStartException(Throwable ex) {}
	    public void handleOnShutdownException(Throwable ex) {}
	}
}

  

时间: 2024-11-10 14:41:05

Disrunptor多生产者多消费者模型讲解的相关文章

Java线程学习整理--4---一个简单的生产者、消费者模型

 1.简单的小例子: 下面这个例子主要观察的是: 一个对象的wait()和notify()使用情况! 当一个对象调用了wait(),那么当前掌握该对象锁标记的线程,就会让出CPU的使用权,转而进入该对象的等待池中等待唤醒,这里说明一下,每一个对象都有一个独立的等待池和锁池! 等待池:上述的wait()后的线程会进入等待池中,处于下图线程声明周期(简单示意图) 中的这个状态,等待池中的线程任然具有对象的锁标记,但是处于休眠状态,不是可运行状态! 当该对象调用notify方法之后,就会在等待池中系统

并发无锁队列学习(单生产者单消费者模型)

1.引言 本文介绍单生产者单消费者模型的队列.根据写入队列的内容是定长还是变长,分为单生产者单消费者定长队列和单生产者单消费者变长队列两种.单生产者单消费者模型的队列操作过程是不需要进行加锁的.生产者通过写索引控制入队操作,消费者通过读索引控制出队列操作.二者相互之间对索引是独享,不存在竞争关系.如下图所示: 2.单生产者单消费者定长队列 这种队列要求每次入队和出队的内容是定长的,即生产者写入队列和消费者读取队列的内容大小事相同的.linux内核中的kfifo就是这种队列,提供了读和写两个索引.

Java并发协作——生产者、消费者模型

概述 对于多线程程序来说,生产者和消费者模型是非常经典的模型.更加准确的说,应该叫"生产者-消费者-仓库模型".离开了仓库,生产者.消费者就缺少了共用的存储空间,也就不存在并非协作的问题了. 示例 定义一个场景.一个仓库只允许存放10件商品,生产者每次可以向其中放入一个商品,消费者可以每次从其中取出一个商品.同时,需要注意以下4点: 1.  同一时间内只能有一个生产者生产,生产方法需要加锁synchronized. 2.  同一时间内只能有一个消费者消费,消费方法需要加锁synchro

生产者、消费者模型

转载地址:http://blog.csdn.net/snow_5288/article/details/72794306 一.概念引入 日常生活中,每当我们缺少某些生活用品时,我们都会去超市进行购买,那么,你有没有想过,你是以什么身份去的超市呢?相信大部分人都会说自己是消费者,确实如此,那么既然我们是消费者,又是谁替我们生产各种各样的商品呢?当然是超市的各大供货商,自然而然地也就成了我们的生产者.如此一来,生产者有了,消费者也有了,那么将二者联系起来的超市又该作何理解呢?诚然,它本身是作为一座交

单生产者-多消费者模型中遇到的问题

(1)      原始代码 最近使用单生产者-多消费者模型是遇到一个问题,以前既然都没有想到过.生产者线程的代码如下,基本功能就是接收到一个连接之后创建一个Socket对象并放到list中等待处理. void DataManager::InternalStart() { server_socket_ = new ServerSocket(); if (!server_socket_->SetAddress(NetworkUtil::GetIpAddress().c_str(), 9091)) {

并发协作:多线程中的生产者与消费者模型

对于多线程程序来说,不管任何编程语言,生产者和消费者模型都是最经典的.就像学习每一门编程语言一样,Hello World!都是最经典的例子. 实际上,准确说应该是“生产者-消费者-仓储”模型,离开了仓储,生产者消费者模型就显得没有说服力了. 对于此模型,应该明确一下几点: 1.生产者仅仅在仓储未满时候生产,仓满则停止生产. 2.消费者仅仅在仓储有产品时候才能消费,仓空则等待. 3.当消费者发现仓储没产品可消费时候会通知生产者生产. 4.生产者在生产出可消费产品时候,应该通知等待的消费者去消费.

开启子进程的两种方式,孤儿进程与僵尸进程,守护进程,互斥锁,IPC机制,生产者与消费者模型

开启子进程的两种方式 # # # 方式一: # from multiprocessing import Process # import time # # def task(x): # print('%s is running' %x) # time.sleep(3) # print('%s is done' %x) # # if __name__ == '__main__': # # Process(target=task,kwargs={'x':'子进程'}) # p=Process(tar

守护进程,互斥锁,IPC,队列,生产者与消费者模型

小知识点:在子进程中不能使用input输入! 一.守护进程 守护进程表示一个进程b 守护另一个进程a 当被守护的进程结束后,那么守护进程b也跟着结束了 应用场景:之所以开子进程,是为了帮助主进程完成某个任务,然而,如果主进程认为自己的事情一旦做完了就没有必要使用子进程了,就可以将子进程设置为守护进程 例如:在运行qq的过程,开启一个进程,用于下载文件,然而文件还没有下载完毕,qq就退出了,下载任务也应该跟随qq的退出而结束. from multiprocessing import Process

Go语言编程:使用条件变量Cond和channel通道实现多个生产者和消费者模型

如题,使用条件变量Cond和channel通道实现多个生产者和消费者模型.Go语言天生带有C语言的基因,很多东西和C与很像,但是用起来 绝对比C语言方便.今天用Go语言来实现下多消费者和生产者模型.如果对C语言的多生产者和消费者模型感兴趣的可以看Linux系统编程:使用mutex互斥锁和条件变量实现多个生成者和消费者模型 代码实现代码实现用了Cond条件变量和channel通道. package main import ( "fmt" "math/rand" &qu