java线程学习之Master-Worker模式

Masker-Worker的核心思想是有两类进程(Masker进程和Worker进程)协作完成任务。Masker进程负责接收和分配任务,Worker负责处理子任务,当各个Worker子进程完成任务后会将结果返回给Masker,由Masker做归纳和总结。其好处在于能将一个大任务分解成若干个小任务,并行执行,从而提供系统的吞吐量。

这个模型主要用于主线程可以分为若干子线程的情形,各子线程之间不会相互影响。

举个例子,这个例子是创建20个Worker去处理100个任务,每个任务是在0-1000的范围内随即一个整数,最后将这些数字相加。分别有Main,Masker,Worker,Task这四个类

 1 import java.util.Random;
 2
 3 public class Main {
 4
 5     public static void main(String[] args) {
 6
 7         Master master = new Master(new Worker(), 20);//生成20个work去处理这个任务
 8
 9         Random r = new Random();
10         for(int i = 1; i <= 100; i++){
11             Task t = new Task();
12             t.setId(i);
13             t.setPrice(r.nextInt(1000));
14             master.submit(t);
15         }
16         master.execute();
17         long start = System.currentTimeMillis();
18
19         while(true){
20             if(master.isComplete()){
21                 long end = System.currentTimeMillis() - start;
22                 int priceResult = master.getResult();
23                 System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
24                 break;
25             }
26         }
27
28     }
29 }

Main

 1 import java.util.HashMap;
 2 import java.util.Map;
 3 import java.util.concurrent.ConcurrentHashMap;
 4 import java.util.concurrent.ConcurrentLinkedQueue;
 5
 6 public class Master {
 7
 8     //1 有一个盛放任务的容器
 9     private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();
10
11     //2 需要有一个盛放worker的集合
12     private HashMap<String, Thread> workers = new HashMap<String, Thread>();
13
14     //3 需要有一个盛放每一个worker执行任务的结果集合
15     private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();
16
17     //4 构造方法
18     public Master(Worker worker , int workerCount){
19         worker.setWorkQueue(this.workQueue);    //因为是消费workQueue里面的数据,所以workQueue放进去
20         worker.setResultMap(this.resultMap);    //消费完之后要把结果集返回给Masker,所以要有resultMap应的引用
21
22         for(int i = 0; i < workerCount; i ++){
23             this.workers.put(Integer.toString(i), new Thread(worker));//创建一个线程并对它起个名字用i来表示
24         }
25
26     }
27
28     //5 需要一个提交任务的方法
29     public void submit(Task task){
30         this.workQueue.add(task);
31     }
32
33     //6 需要有一个执行的方法,启动所有的worker方法去执行任务
34     public void execute(){
35         for(Map.Entry<String, Thread> me : workers.entrySet()){
36             me.getValue().start();    //循环这已经装好的Works,让它们都起动起来
37         }
38     }
39
40     //7 判断是否运行结束的方法
41     public boolean isComplete() {
42         for(Map.Entry<String, Thread> me : workers.entrySet()){//循环这写Works线程,判断其状态
43             if(me.getValue().getState() != Thread.State.TERMINATED){
44                 return false;
45             }
46         }
47         return true;
48     }
49
50     //8 计算结果方法
51     public int getResult() {
52         int priceResult = 0;
53         for(Map.Entry<String, Object> me : resultMap.entrySet()){
54             priceResult += (Integer)me.getValue();
55         }
56
57         return priceResult;
58
59     }
60
61
62     

Masker

 1 import java.util.concurrent.ConcurrentHashMap;
 2 import java.util.concurrent.ConcurrentLinkedQueue;
 3
 4 public class Worker implements Runnable {
 5
 6     private ConcurrentLinkedQueue<Task> workQueue;
 7     private ConcurrentHashMap<String, Object> resultMap;
 8
 9     public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
10         this.workQueue = workQueue;
11     }
12
13     public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
14         this.resultMap = resultMap;
15     }
16
17     @Override
18     public void run() {
19         while(true){
20             Task input = this.workQueue.poll();//不断地从Mask的workQueue将任务取出来
21             if(input == null) break;   //为空的话证明它已经消费完了
22             Object output = handle(input);//否则交给handle方法进行处理
23             this.resultMap.put(Integer.toString(input.getId()), output);//将任务的id,和结果集放到resultMap里
24         }
25     }
26
27     private Object handle(Task input) {
28         Object output = null;
29         try {
30             //处理任务的耗时。。 比如说进行操作数据库。。。
31             Thread.sleep(500);
32             output = input.getPrice();//得到处理完的结果
33         } catch (InterruptedException e) {
34             e.printStackTrace();
35         }
36         return output;
37     }
38
39
40
41 }

Worker

 1 public class Task {
 2
 3     private int id;
 4     private int price ;
 5     public int getId() {
 6         return id;
 7     }
 8     public void setId(int id) {
 9         this.id = id;
10     }
11     public int getPrice() {
12         return price;
13     }
14     public void setPrice(int price) {
15         this.price = price;
16     }
17
18 }

Task

运行结果为:

执行时间是每个任务休眠0.5s乘以100个任务,除以20个Worker,为2.5s。创建的Worker数并不是越大越好,因为创建Worker也需要花费时间。

原文地址:https://www.cnblogs.com/songlove/p/10858082.html

时间: 2024-11-06 19:39:55

java线程学习之Master-Worker模式的相关文章

Java线程学习经典例子-读写者演示

Java线程学习最经典的例子-读写者,主要用到Thread相关知识如下: -         线程的start与run -         线程的休眠(sleep) -         数据对象加锁(synchronized) -         数据对象的等待与释放(wait and notify) 程序实现: -ObjectData数据类对象,通过synchronized关键字实现加锁,在线程读写者中使用. -ConsumerThread消费者线程,读取数据对象中count值之后,通知生产者

黑马程序与----java线程学习

线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元.一个标准的线程由线程ID,当前指令指针(PC),寄存器集合和堆栈组成.另外,线程是进程中的一个实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有一点儿在运行中必不可少的资源,但它可与同属一个进程的其它线程共享进程所拥有的全部资源.一个线程可以创建和撤消另一个线程,同一进程中的多个线程之间可以并发执行.由于线程之间的相互制约,致使线程在运行中呈现出间断性.线程也有就绪.阻塞和运

Java线程学习笔记(二) 线程的异常捕捉

线程异常的捕捉: 正常的情况下,我们在main()方法里是捕捉不到线程的异常的,例如以下代码: public class ExceptionThread implements Runnable{ @Override public void run() { throw new NullPointerException(); } public static void main(String[] args) { ExecutorService executorService = Executors.n

Java线程学习笔记(一)

一.线程的创建方式: 老掉牙的话题了,继承 java.lang.Thread父类或者实现Runnalbe接口,这里就提一句: class Thread implements Runnable Thread也是继承了Runnable接口的,Runnable才是大哥. 重写run(),run()里放的都是具体的业务,包括对线程的具体操作. class Thread1 implements Runnable { int i; Thread1(int i) { this.i = i; } @Overri

java 线程学习。

一.操作系统中线程和进程的概念 现在的操作系统是多任务操作系统.多线程是实现多任务的一种方式. 进程是指一个内存中运行的应用程序,每个进程都有自己独立的一块内存空间,一个进程中可以启动多个线程.比如在Windows系统中,一个运行的exe就是一个进程. 线程是指进程中的一个执行流程,一个进程中可以运行多个线程.比如java.exe进程中可以运行很多线程.线程总是属于某个进程,进程中的多个线程共享进程的内存. "同时"执行是人的感觉,在线程之间实际上轮换执行. 二.Java中的线程 使用

Java多线程学习笔记--生产消费者模式

实际开发中,我们经常会接触到生产消费者模型,如:Android的Looper相应handler处理UI操作,Socket通信的响应过程.数据缓冲区在文件读写应用等.强大的模型框架,鉴于本人水平有限目前水平只能膜拜,本次只能算学习笔记,为了巩固自己对Java多线程常规知识点的理解,路过大神还望能指导指导.下面一段代码是最常规的生产者消费者的例子: package com.zhanglei.demo; import java.util.ArrayList; import java.util.List

java线程学习-Thread.currentTread().getName()和this.getName()的区别

很久没有写java程序了,由于为了改变目前的状况,打算花两天时间学习一下java的线程开发和高并发. 线程开发使用thread类,或者runnable接口,而且thread类也是实现了runnable接口的. 先来个小甜点作为开始,如下为创建多个线程,并且同时处理. package firstThreadStudy; public class MyThread extends Thread { public MyThread(){ System.out.println("MyThread cur

Java设计模式学习记录-简单工厂模式、工厂方法模式

前言 之前介绍了设计模式的原则和分类等概述.今天开启设计模式的学习,首先要介绍的就是工厂模式,在介绍工厂模式前会先介绍一下简单工厂模式,这样由浅入深来介绍. 简单工厂模式 做法:创建一个工厂(方法或类)用来制造对象. 当一个人想要用手机的时候,需要自己创建手机然后来使用. 如下: public class IphoneX { public IphoneX(){ System.out.println("##### 制造iphoneX #####"); } } public class I

java线程学习(一)

1.简介 java基础知识部分线程创建的三种方式.线程执行的样例. 代码地址:http://git.oschina.net/blue_phantom/javaj 包位置:package com.bluej.javaj.thread.first; 2.创建线程 继承Thread类.实现Runnale接口.实现Callable接口. 1 /** 2 * 创建线程方式一 3 * @author xingdongjie 4 * 5 */ 6 class CreateThread1 extends Thr