Java之多线程中的Master-Worker模式

该模式的好处是,将大任务拆解成若干小任务并并行执行,从而提高系统吞吐量。

定义Worker进程,负责处理实际任务。

/*具体工作对象*/static abstract class Worker<T, R> implements Runnable {    private static final UtilsLog lg = UtilsLog.getLogger(Worker.class);    protected Queue<T> workQueue;//持有Master的任务队列    protected Map<String, R> resultMap;//用于存储结果集,key为任务对应的唯一标识符

public void setWorkQueue(Queue<T> workQueue) {        this.workQueue = workQueue;    }

public void setResultMap(Map<String, R> resultMap) {        this.resultMap = resultMap;    }

public abstract R handler(T entity);

@Override    public void run() {        while (true) {            T childWork = workQueue.poll();            if (childWork == null) {                lg.e("已经没有任务在队列中等待执行");                break;            }            //处理子任务            R result = handler(childWork);            resultMap.put(Integer.toString(childWork.hashCode()), result);        }    }}

定义Master进程,负责接收和分配任务。Master会在提交任务的同时立即返回结果集,由于此处的getResultMap属于引用传递,因此属性resultMap的修改会同步至业务层。

public static class Master<T, R> {    private static final UtilsLog lg = UtilsLog.getLogger(Master.class);    protected Queue<T> workQueue;//用于存储任务集    protected Map<String, Thread> threadMap;//存储执行任务的线程集    protected Map<String, R> resultMap;//存储相关结果

@TargetApi(Build.VERSION_CODES.LOLLIPOP)    public Master(Worker<T, R> work, int threadCount) {        workQueue = new ConcurrentLinkedDeque<T>();        threadMap = new HashMap<>();        resultMap = new HashMap<>();

work.setWorkQueue(workQueue);        work.setResultMap(resultMap);        for (int i = 0; i < threadCount; i++) {            threadMap.put(Integer.toString(i), new Thread(work, "thread tag with " + Integer.toString(i)));        }    }

//是否所有的子任务都结束了    public boolean isComplete() {        for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {            if (entry.getValue().getState() != Thread.State.TERMINATED) {                return false;            }        }        return true;    }

public Map<String, R> getResultMap() {        return resultMap;    }

public Master addJob(T job) {        workQueue.add(job);        return this;    }

public void execute() {        for (Map.Entry<String, Thread> entry : threadMap.entrySet()) {            entry.getValue().start();        }    }}

在业务层调用方案如下,

        Master<Integer, Integer> master = new Master<>(new Worker<Integer, Integer>() {            @Override            public Integer handler(Integer entity) {                int max = 50, min = 0;                UtilsThread.sleepIgnoreInteruptedException(new Random().nextInt(max) % (max - min + 1) + min);//随机模拟耗时操作                lg.e("执行handler程序 with value:" + entity);                return entity * entity;            }        }, 3);        int jobCount = 10;//任务数        for (int i = 0; i < jobCount; i++) {            master.addJob(i);        }        master.execute();

Map<String, Integer> resultMap = master.getResultMap();        while (true) {            int resultMapSize = resultMap.size();//            lg.e("并行执行结果集中已有数据量:" + resultMapSize);//此处resultMap持有Master中结果集的引用,因此在线程不断执行的过程中不断刷新结果姐,会连带导致这里值的改变            if (master.isComplete()) {                break;            }        }

执行结果如下,

另外,也应注意到,在执行handler方式,到结果集中写入数据是有延时的,这在开发中需要格外注意,务必使用master.isComplete判断任务完成状况。



来自为知笔记(Wiz)

时间: 2024-10-09 20:22:35

Java之多线程中的Master-Worker模式的相关文章

Java之多线程中的Future模式

应用场景:线程A需要线程B的执行结果,但没必要一直等待线程B执行完,这个时候可以先拿到未来的Future对象,等线程B执行完再来取真实结果. 定义RealData真实数据类,其构造函数很慢,是用户最后需要使用的数据, static class RealData<T> { protected T result; public RealData(T result) { this.result = result; } public T getResult() { return result; } }

在JAVA和android中常用的单列模式

在很多开发中,项目为了节约资源,都把一个类的构造函数变为私有化,这样整个项目中就不能创建多个实例,这样的方法我们称为单例模式 现在通过代码来简介下这个单例模式: 在新建一个java项目后,创建一个实体类User.java,和测试类,main.java 代码如下: 1 public class User { 2 private static User user; 3 4 //添加该实例的属性 5 private String name; 6 private String sex; 7 privat

在 Java 的多线程中,如何去判断给定的一个类是否是线程安全的(另外:synchronized 同步是否就一定能保证该类是线程安全的。)

同步代码块和同步方法的区别:同步代码块可以传入任意对象,同步方法中 如果多个线程检查的都是一个新的对象,不同的同步锁对不同的线程不具有排他性,不能实现线程同步的效果,这时候线程同步就失效了. 两者的区别主要体现在同步锁上面.对于实例的同步方法,因为只能使用this来作为同步锁,如果一个类中需要使用到多个锁,为了避免锁的冲突,必然需要使用不同的对象,这时候同步方法不能满足需求,只能使用同步代码块(同步代码块可以传入任意对象): 有几个原则的:程序次序规则:一个线程内,代码的执行会按照程序书写的顺序

多线程中的静态代理模式

静态代理的例子: 1 /** 2 * 1.真实角色 3 * 2.代理角色 4 * 3.共同实现的接口 5 * @author tele 6 * 7 */ 8 public class StaticProxy { 9 public static void main(String[] args) { 10 //创建真实角色,可使用接口 11 Marry you = new You(); 12 //创建代理角色,代理角色往往要添加新的方法,不建议使用接口创建 13 WeddingCompany com

Java的多线程实现生产/消费模式

Java的多线程实现生产/消费模式 在Java的多线程中,我们经常使用某个Java对象的wait(),notify()以及notifyAll() 方法实现多线程的通讯,今天就使用Java的多线程实现生产/消费模式,需求如下: 线程A ProductThread 继承Thread 实现生产数据 若线程共享的数据不为NULL,则生产线程进入等待状态 线程B CustomThread 继承Thread 实现消费数据(输出到控制台) 当线程共享数据为NULL的时候,进入等待状态 线程B 消费完数据之后,

Apache服务器中prefork和worker工作模式

一.多道处理模块MPM介绍 Apache HTTP 服务器被设计为一个功能强大,并且灵活的 web 服务器, 可以在很多平台与环境中工作.不同平台和不同的环境往往需要不同 的特性,或可能以不同的方式实现相同的特性最有效率.Apache 通过模块化的设计来适应各种环境.这种设计允许网站管理员通过在 编译时或运行时,选择哪些模块将会加载在服务器中,来选择服务器特性.        Apache 2.0 扩展此模块化设计到最基本的 web 服务器功能. 它提供了可以选择的多处理模块(MPM),用来绑定

java开发中的23中设计模式详解--大话设计模式

设计模式(Design Patterns) ——可复用面向对象软件的基础 设计模式(Design pattern)是一套被反复使用.多数人知晓的.经过分类编目的.代码设计经验的总结.使用设计模式是为了可重用代码.让代码更容易被他人理解.保证代码可靠性. 毫无疑问,设计模式于己于他人于系统都是多赢的,设计模式使代码编制真正工程化,设计模式是软件工程的基石,如同大厦的一块块砖石一样.项目中合理的运用设计模式可以完美的解决很多问题,每种模式在现在中都有相应的原理来与之对应,每一个模式描述了一个在我们周

java中的23中设计模式(转)

设计模式(Design Patterns) --可复用面向对象软件的基础 设计模式(Design pattern)是一套被反复使用.多数人知晓的.经过分类编目的.代码设计经验的总结.使用设计模式是为了可重用代码.让代码更容易被他人理解.保证代码可靠性. 毫无疑问,设计模式于己于他人于系统都是多赢的,设计模式使代码编制真正工程化,设计模式是软件工程的基石,如同大厦的一块块砖石一样.项目中合理的运用设计模式可以完美的解决很多问题,每种模式在现在中都有相应的原理来与之对应,每一个模式描述了一个在我们周

转 Java多线程中Sleep与Wait的区别

Java中的多线程是一种抢占式的机制,而不是分时机制.抢占式的机制是有多个线程处于可运行状态,但是只有一个线程在运行. 共同点: 1. 他们都是在多线程的环境下,都可以在程序的调用处阻塞指定的毫秒数,并返回. 2. wait()和sleep()都可以通过interrupt()方法 打断线程的暂停状态 ,从而使线程立刻抛出InterruptedException. 如果线程A希望立即结束线程B,则可以对线程B对应的Thread实例调用interrupt方法.如果此刻线程B正在wait/sleep/