Masker-Worker的核心思想是有两类进程(Masker进程和Worker进程)协作完成任务。Masker进程负责接收和分配任务,Worker负责处理子任务,当各个Worker子进程完成任务后会将结果返回给Masker,由Masker做归纳和总结。其好处在于能将一个大任务分解成若干个小任务,并行执行,从而提供系统的吞吐量。
这个模型主要用于主线程可以分为若干子线程的情形,各子线程之间不会相互影响。
举个例子,这个例子是创建20个Worker去处理100个任务,每个任务是在0-1000的范围内随即一个整数,最后将这些数字相加。分别有Main,Masker,Worker,Task这四个类
1 import java.util.Random; 2 3 public class Main { 4 5 public static void main(String[] args) { 6 7 Master master = new Master(new Worker(), 20);//生成20个work去处理这个任务 8 9 Random r = new Random(); 10 for(int i = 1; i <= 100; i++){ 11 Task t = new Task(); 12 t.setId(i); 13 t.setPrice(r.nextInt(1000)); 14 master.submit(t); 15 } 16 master.execute(); 17 long start = System.currentTimeMillis(); 18 19 while(true){ 20 if(master.isComplete()){ 21 long end = System.currentTimeMillis() - start; 22 int priceResult = master.getResult(); 23 System.out.println("最终结果:" + priceResult + ", 执行时间:" + end); 24 break; 25 } 26 } 27 28 } 29 }
Main
1 import java.util.HashMap; 2 import java.util.Map; 3 import java.util.concurrent.ConcurrentHashMap; 4 import java.util.concurrent.ConcurrentLinkedQueue; 5 6 public class Master { 7 8 //1 有一个盛放任务的容器 9 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); 10 11 //2 需要有一个盛放worker的集合 12 private HashMap<String, Thread> workers = new HashMap<String, Thread>(); 13 14 //3 需要有一个盛放每一个worker执行任务的结果集合 15 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); 16 17 //4 构造方法 18 public Master(Worker worker , int workerCount){ 19 worker.setWorkQueue(this.workQueue); //因为是消费workQueue里面的数据,所以workQueue放进去 20 worker.setResultMap(this.resultMap); //消费完之后要把结果集返回给Masker,所以要有resultMap应的引用 21 22 for(int i = 0; i < workerCount; i ++){ 23 this.workers.put(Integer.toString(i), new Thread(worker));//创建一个线程并对它起个名字用i来表示 24 } 25 26 } 27 28 //5 需要一个提交任务的方法 29 public void submit(Task task){ 30 this.workQueue.add(task); 31 } 32 33 //6 需要有一个执行的方法,启动所有的worker方法去执行任务 34 public void execute(){ 35 for(Map.Entry<String, Thread> me : workers.entrySet()){ 36 me.getValue().start(); //循环这已经装好的Works,让它们都起动起来 37 } 38 } 39 40 //7 判断是否运行结束的方法 41 public boolean isComplete() { 42 for(Map.Entry<String, Thread> me : workers.entrySet()){//循环这写Works线程,判断其状态 43 if(me.getValue().getState() != Thread.State.TERMINATED){ 44 return false; 45 } 46 } 47 return true; 48 } 49 50 //8 计算结果方法 51 public int getResult() { 52 int priceResult = 0; 53 for(Map.Entry<String, Object> me : resultMap.entrySet()){ 54 priceResult += (Integer)me.getValue(); 55 } 56 57 return priceResult; 58 59 } 60 61 62
Masker
1 import java.util.concurrent.ConcurrentHashMap; 2 import java.util.concurrent.ConcurrentLinkedQueue; 3 4 public class Worker implements Runnable { 5 6 private ConcurrentLinkedQueue<Task> workQueue; 7 private ConcurrentHashMap<String, Object> resultMap; 8 9 public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) { 10 this.workQueue = workQueue; 11 } 12 13 public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { 14 this.resultMap = resultMap; 15 } 16 17 @Override 18 public void run() { 19 while(true){ 20 Task input = this.workQueue.poll();//不断地从Mask的workQueue将任务取出来 21 if(input == null) break; //为空的话证明它已经消费完了 22 Object output = handle(input);//否则交给handle方法进行处理 23 this.resultMap.put(Integer.toString(input.getId()), output);//将任务的id,和结果集放到resultMap里 24 } 25 } 26 27 private Object handle(Task input) { 28 Object output = null; 29 try { 30 //处理任务的耗时。。 比如说进行操作数据库。。。 31 Thread.sleep(500); 32 output = input.getPrice();//得到处理完的结果 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } 36 return output; 37 } 38 39 40 41 }
Worker
1 public class Task { 2 3 private int id; 4 private int price ; 5 public int getId() { 6 return id; 7 } 8 public void setId(int id) { 9 this.id = id; 10 } 11 public int getPrice() { 12 return price; 13 } 14 public void setPrice(int price) { 15 this.price = price; 16 } 17 18 }
Task
运行结果为:
执行时间是每个任务休眠0.5s乘以100个任务,除以20个Worker,为2.5s。创建的Worker数并不是越大越好,因为创建Worker也需要花费时间。
原文地址:https://www.cnblogs.com/songlove/p/10858082.html
时间: 2024-11-06 19:39:55