该模式的好处是,将大任务拆解成若干小任务并并行执行,从而提高系统吞吐量。
定义Worker进程,负责处理实际任务。
/*具体工作对象*/static abstract class Worker<T, R> implements Runnable { private static final UtilsLog lg = UtilsLog.getLogger(Worker.class); protected Queue<T> workQueue;//持有Master的任务队列 protected Map<String, R> resultMap;//用于存储结果集,key为任务对应的唯一标识符 public void setWorkQueue(Queue<T> workQueue) { this.workQueue = workQueue; } public void setResultMap(Map<String, R> resultMap) { this.resultMap = resultMap; } public abstract R handler(T entity); @Override public void run() { while (true) { T childWork = workQueue.poll(); if (childWork == null) { lg.e("已经没有任务在队列中等待执行"); break; } //处理子任务 R result = handler(childWork); resultMap.put(Integer.toString(childWork.hashCode()), result); } }}
定义Master进程,负责接收和分配任务。Master会在提交任务的同时立即返回结果集,由于此处的getResultMap属于引用传递,因此属性resultMap的修改会同步至业务层。
public static class Master<T, R> { private static final UtilsLog lg = UtilsLog.getLogger(Master.class); protected Queue<T> workQueue;//用于存储任务集 protected Map<String, Thread> threadMap;//存储执行任务的线程集 protected Map<String, R> resultMap;//存储相关结果 @TargetApi(Build.VERSION_CODES.LOLLIPOP) public Master(Worker<T, R> work, int threadCount) { workQueue = new ConcurrentLinkedDeque<T>(); threadMap = new HashMap<>(); resultMap = new HashMap<>(); work.setWorkQueue(workQueue); work.setResultMap(resultMap); for (int i = 0; i < threadCount; i++) { threadMap.put(Integer.toString(i), new Thread(work, "thread tag with " + Integer.toString(i))); } } //是否所有的子任务都结束了 public boolean isComplete() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } public Map<String, R> getResultMap() { return resultMap; } public Master addJob(T job) { workQueue.add(job); return this; } public void execute() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { entry.getValue().start(); } }}
在业务层调用方案如下,
Master<Integer, Integer> master = new Master<>(new Worker<Integer, Integer>() { @Override public Integer handler(Integer entity) { int max = 50, min = 0; UtilsThread.sleepIgnoreInteruptedException(new Random().nextInt(max) % (max - min + 1) + min);//随机模拟耗时操作 lg.e("执行handler程序 with value:" + entity); return entity * entity; } }, 3); int jobCount = 10;//任务数 for (int i = 0; i < jobCount; i++) { master.addJob(i); } master.execute(); Map<String, Integer> resultMap = master.getResultMap(); while (true) { int resultMapSize = resultMap.size();// lg.e("并行执行结果集中已有数据量:" + resultMapSize);//此处resultMap持有Master中结果集的引用,因此在线程不断执行的过程中不断刷新结果姐,会连带导致这里值的改变 if (master.isComplete()) { break; } }
执行结果如下,
另外,也应注意到,在执行handler方式,到结果集中写入数据是有延时的,这在开发中需要格外注意,务必使用master.isComplete判断任务完成状况。
时间: 2024-10-09 20:22:35