Java Thread系列(九)Master-Worker模式

Java Thread系列(九)Master-Worker模式

Master-Worker模式是常用的并行设计模式.

一、Master-Worker 模式核心思想

Master-Worker 系统由两个角色组成,Master 和 Worker,Master 负责接收和分配任务,Worker 负责处理子任务。任务处理过程中,Master 还负责监督任务进展和 Worker 的健康状态;Master 将接收 Client 提交的任务,并将任务的进展汇总反馈给 Client。各角色关系如下图:

二、Master-Worker 实现

(1) Master

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

public class Master {
    //1. 应该有一个容器存放任务列表,这个容器需要支持高并发操作
    private ConcurrentLinkedDeque<Task> taskQueue = new ConcurrentLinkedDeque<Task>();

    //2. 应该有一个容器存放worker
    private HashMap<String, Thread> workers = new HashMap<String, Thread>();

    //3. 应该有一个容器存放结果集,这个容器需要支持高并发操作
    private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

    //4. 构造函数
    public Master (Worker worker, int threadCount) {
        //将任务列表和结果集传递给worker
        worker.setTaskQueue(taskQueue);
        worker.setResultMap(resultMap);
        //初始化worder列表
        for (int i = 0; i < threadCount; i++) {
            workers.put("worker-" + i, new Thread(worker));
        }
    }

    public Master (Worker worker) {
        this(worker, Runtime.getRuntime().availableProcessors());
    }

    //5. 提交任务
    public void submit (Task task) {
        taskQueue.add(task);
    }

    //6. 执行方法 开启所有的线程
    public void execute () {
        for(Map.Entry<String, Thread> me : workers.entrySet()) {
            me.getValue().start();
        }
    }

    //7. 判断是否执行完毕
    public boolean isComplete () {
        for(Map.Entry<String, Thread> me : workers.entrySet()) {
            if (me.getValue().getState() != Thread.State.TERMINATED)
                return false;
        }
        return true;
    }

    //8. 处理结果集
    public int getResult () {
        int ret = 0;
        for(Map.Entry<String, Object> me : resultMap.entrySet()) {
            ret += (int)me.getValue();
        }
        return ret;
    }
}

(2) Worker

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

public class Worker implements Runnable {

    private ConcurrentLinkedDeque<Task> taskQueue;
    private ConcurrentHashMap<String, Object> resultMap;

    @Override
    public void run() {
        while (true) {
            Task task = taskQueue.poll();
            if (task == null) break;

            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回结果集
            resultMap.put(Integer.toString(task.getId()), handle(task));
        }
    }

    private Object handle(Task task) {
        return task.getCount();
    }

    public void setTaskQueue(ConcurrentLinkedDeque<Task> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }
}

(3) Task

public class Task {
    private int id;
    private String name;
    private int count;

    public Task() {}
    public Task(int id, String name, int count) {
        this.id = id;
        this.name = name;
        this.count = count;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getCount() {
        return count;
    }
    public void setCount(int count) {
        this.count = count;
    }
    @Override
    public String toString() {
        return "Task{" + "id=" + id + ", name=‘" + name + ‘\‘‘ +
                ", count=" + count + ‘}‘;
    }
}

(4) 测试

Master master = new Master(new Worker(), 1);

for (int i = 1; i <= 100; i++) {
    master.submit(new Task(i, "task-" + i ,i));
}

master.execute();

long t1 = System.currentTimeMillis();
while (true) {
    if (master.isComplete()) {
        long t = System.currentTimeMillis() - t1;
        System.out.printf("执行结果:%s;执行时间:%s", master.getResult(), t);
        break;
    }
}


每天用心记录一点点。内容也许不重要,但习惯很重要!

原文地址:https://www.cnblogs.com/binarylei/p/8999712.html

时间: 2024-10-07 17:30:33

Java Thread系列(九)Master-Worker模式的相关文章

Java Thread系列(十)Future 模式

Java Thread系列(十)Future 模式 Future 模式适合在处理很耗时的业务逻辑时进行使用,可以有效的减少系统的响应时间,提高系统的吞吐量. 一.Future 模式核心思想 如下的请求调用过程时序图.当 call 请求发出时,需要很长的时间才能返回.左边的图需要一直等待,等返回数据后才能继续其他操作:而右边的 Future 模式的图中客户端则无需等到可以做其他的事情.服务器段接收到请求后立即返回结果给客户端,这个结果并不是真实的结果(是虚拟的结果),也就 是先获得一个假数据,然后

Java Thread系列(四)线程通信

Java Thread系列(四)线程通信 一.传统通信 public static void main(String[] args) { //volatile实现两个线程间数据可见性 private volatile static List list = new ArrayList(); Thread t1 = new Thread(new Runnable() { // (1) public void run() { try { for(int i = 0; i <10; i++){ list

Java Thread系列(一)线程创建

Java Thread系列(一)线程创建 Java 中创建线程主要有三种方式:继承 Thread.实现 Runnable 接口.使用 ExecutorService.Callable.Future 实现由返回结果的多线程. 一.继承 Thread 类创建线程类 public class MyThread extends Thread { public void run() { for (int i = 0; i < 10000; i++) { System.out.println("线程一

Java Thread系列(三)线程安全

Java Thread系列(三)线程安全 一.什么是线程安全 线程安全概念:当多个线程访问某一个类(对象或方法)时,这个类始终都能表现出正确的行为,那么这个类(对象或方法)就是线程安全的. 线程安全来说,需要满足以下两个特性: 原子性 可见性 public class MyThread extends Thread { private int count = 5; //synchronized加锁 同步锁 public /*synchronized*/ void run () { System.

Java Thread系列(五)synchronized

Java Thread系列(五)synchronized synchronized锁重入 关键字 synchronized 拥有锁重入的功能,也就是在使用 synchronized 时,当线程等到一个对象的锁后,再次请求此对象时可以再次得到该对象的锁.出现异常时释放锁. synchronized异常 synchronized代码块 使用 synchronized 声明的方法在某些情况下是有弊端的,比如A线程调用同步的方法执行一个很长时间的任务,那么B线程就必须等待比较长的时间才能执行,这样的情况

Java设计模式系列之责任链模式

责任链模式 责任链模式是一种对象的行为模式.在责任链模式里,很多对象由每一个对象对其下家的引用而连接起来形成一条链.请求在这个链上传递,直到链上的某一个对象决定处理此请求.发出这个请求的客户端并不知道链上的哪一个对象最终处理这个请求,这使得系统可以在不影响客户端的情况下动态地重新组织和分配责任.Tomcat中的Filter就是使用了责任链模式,创建一个Filter除了要在web.xml文件中做相应配置外,还需要实现javax.servlet.Filter接口. 为了方便理解,责任链模式直接用马士

Java设计模式系列之中介者模式

中介者模式(Mediator)的定义 用一个中介对象来封装一系列的对象交互.中介者使各对象不需要显式地相互引用,从而使其耦合松散,而且可以独立地改变它们之间的交互. 中介者模式(Mediator)的适用性 1.一组对象以定义良好但是复杂的方式进行通信,产生的相互依赖关系结构混乱且难以理解. 2.一个对象引用其他很多对象并且直接与这些对象通信,导致难以复用该对象. 3.想定制一个分布在多个类中的行为,但又不想生成太多的子类. 中介者模式(Mediator)的参与者 1.Mediator 中介者定义

Java设计模式系列之装饰者模式

装饰者模式的定义 动态地将责任附加到对象上,若要扩展功能,装饰者提供了比继承更有弹性的替代方案  装饰者模式的UML类图 原文地址:https://www.cnblogs.com/ysw-go/p/10325870.html

java并发系列(九)-----ConcurrentHashMap原理分析(JDK1.7)

数据结构 ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成.Segment实际继承自可重入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色:HashEntry则用于存储键值对数据.一个ConcurrentHashMap里包含一个Segment数组,每个Segment里包含一个HashEntry数组,我们称之为table,每个HashEntry是一个链表结构的元素. 面试常问: 1. ConcurrentHashMap