Java Master-Worker模式实现

Master-Worker模式简介

Master-Worker模式是非常经典的常用的一个并行计算模式,它的核心思想是2类进程协作工作:Master进程和Worker进程。Master负责接收客户端请求,分配任务;Worker负责具体处理任务。当各个Worker处理完任务后,统一将结果返回给Master,由Master进行整理和总结。其好处是能够将一个大JOB分解成若干小JOB,并行执行,从而提高系统的吞吐量。比如流行的Web Server,如Nginx,Apache HTTP都存在这种Master-Worker工作模式;离线分布式计算框架Hadoop的JobTracker和TaskTracker,实时流计算框架Strom的Nimbus和Supervisor都涉及到这种思想。那么下面我们来具体分析下Java Master-Worker模式的实现。

Master-Worker模式分析

我们重点分析下Master,Worker这2个角色。

Master

Master需要接受Client端提交过来的任务Task,而且还得将Task分配给Worker进行处理,因此Master需要一个存储来存放Task。那么采用哪种存储集合呢?首先来说,需要支持并发的集合类,因为多个Worker间可能存在任务竞争,因此我们需要考虑java.util.concurrent包下的集合。这里可以考虑采用非阻塞的ConcurrentLinkedQueue。

Master需要清楚的知道各个Woker的基本信息,如是否各个Worker都运行完毕,因此Master端需要保存Worker的信息,可以采用Map存储。

由于最后各个Worker都会上报运行结果,Master端需要有一个存储结果的Map,可以采用支持并发的ConcurrentHashMap。

Worker

Worker需要持有Master端的任务Task集合的引用,因为Worker需要从里面拿取Task。

同上,Worker需要持有Master端的存储结果的引用。

综上,我们可以得到如下:

我们可以进一步细化,Master/Worker应该提供什么操作?

Master:

  1. 通过构造方法以初始化workers
  2. 应该提供submit(Task)方法接受Client端提交过来的任务
  3. start()让workers开始处理任务
  4. 提供isComplete()判断各个worker的状态,是否都处理完毕
  5. 提供getResult()给客户端返回结果

Worker:

  1. Worker本质上就是Runnable,提供run()
  2. 负责处理业务逻辑的handle()

Java Master-Worker代码实现

Task

public class Task {

    private long id;
    private String name;

    public Task(long id, String name) {
        this.id = id;
        this.name = name;
    }

    public long getId() {
        return id;
    }

    public void setId(long id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

}

Worker

public class Worker implements Runnable {

    private long id;
    private String name;

    private ConcurrentLinkedQueue<Task> workQueue;

    private ConcurrentHashMap<Long,Object> results;

    public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResults(ConcurrentHashMap<Long, Object> results) {
        this.results = results;
    }

    public Worker(long id, String name) {
        this.id = id;
        this.name = name;
    }

    @Override
    public void run() {

        while(true){

            Task task = workQueue.poll();

            if(task == null){
                break;
            }

            long start = System.currentTimeMillis();
            long result = handle(task);

            this.results.put(task.getId(),result);

            System.out.println(this.name + " handle " + task.getName() + " success . result is " + result + " cost time : " + (System.currentTimeMillis() - start));
        }

    }

    /**
     * 负责处理具体业务逻辑
     * @param task
     * @return
     */
    private long handle(Task task) {

        //这里只是模拟下,在真实环境也许是查询数据库,也许是查缓存等
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return new Random().nextLong();
    }
}

Master

public class Master {

    private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();

    private Map<Long,Thread> workers = new HashMap<Long, Thread>();

    private ConcurrentHashMap<Long,Object> results = new ConcurrentHashMap<Long, Object>();

    public Master(int num){

        for(int i = 0 ; i < num ; i++){

            Worker worker = new Worker(i,"worker-" + i);
            worker.setResults(results);
            worker.setWorkQueue(workQueue);

            workers.put(Long.valueOf(i),new Thread(worker));
        }

    }

    public void submit(Task task){
        workQueue.add(task);
    }

    public void start(){

        for (Map.Entry<Long,Thread> entry : workers.entrySet()){

            entry.getValue().start();
        }

    }

    public boolean isComlepte(){

        for(Map.Entry<Long,Thread> entry : workers.entrySet()){

            if(entry.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }

        }

        return true;
    }

    public long getSumResult(){

        long value = 0;
        for(Map.Entry<Long,Object> entry : results.entrySet()){

            value = value + (Long)entry.getValue();

        }
        return value;
    }
}

Main

public class Main {

    public static void main(String[] args) {

        Master master = new Master(10);

        for(int i = 0 ; i < 10 ; i++){

            Task task = new Task(i,"task-" + i);

            master.submit(task);
        }

        long start = System.currentTimeMillis();
        master.start();

        while(true){

            if(master.isComlepte()){

                System.out.println("sum result is " + master.getSumResult() + " . cost time : " + (System.currentTimeMillis() - start));
                break;
            }
        }

    }

}

运行结果

总结

在单线程的时候,处理一个Task需要500ms,那么处理10个Task需要5S,如果采用Master-Worker这种并行模型,可以大大缩短计算处理时间。

时间: 2024-08-24 07:06:17

Java Master-Worker模式实现的相关文章

Master和worker模式

让和hadoop的设计思想是一样的,Master负责分配任务和获取任务的结果,worker是真正处理业务逻辑的. 使用ConcurrentLikedQueue去承载所有的任务,因为会有多个worker会并发修改这个队列. public class Task { private int id; private int price ; public int getId() { return id; } public void setId(int id) { this.id = id; } publi

Java多线程Master-Worker模式

Java多线程Master-Worker模式,多适用于需要大量重复工作的场景中. 例如:使用Master-Worker计算0到100所有数字的立方的和 1.Master接收到100个任务,每个任务需要0到100中每个数字的立方,这里为了效果,每个任务再sleep一秒, Master需要将这些任务放到一个支持高并发的非阻塞队列queue中如:ConcurrentLinkedQueue<E>. 2.Master创建10个worker去执行这100个任务,并准备一个支持高并发且线程安全的hashMa

Java装饰者模式

Java装饰者模式简介 一.假设有一个Worker接口,它有一个doSomething方法,Plumber和Carpenter都实现了Worker接口,代码及关系如下: 1.Worker.java package decorator; public interface Worker { public void doSomething(); } 2.Plumber.java public class Plumber implements Worker { @Override public void

大数据技术之_16_Scala学习_11_客户信息管理系统+并发编程模型 Akka+Akka 网络编程-小黄鸡客服案例+Akka 网络编程-Spark Master Worker 进程通讯项目

第十五章 客户信息管理系统15.1 项目的开发流程15.2 项目的需求分析15.3 项目的界面15.4 项目的设计-程序框架图15.5 项目的功能实现15.5.1 完成 Customer 类15.5.2 完成显示主菜单和退出软件功能15.5.3 完成显示客户列表的功能15.5.4 完成添加客户的功能15.5.5 完成删除客户的功能15.5.6 完善退出确认功能15.5.7 完善删除确认功能15.5.8 完成修改客户的功能第十六章 并发编程模型 Akka16.1 Akka 的介绍16.2 Acto

Java的MVC模式简介

Java的MVC模式简介 MVC(Model View Control)模型-视图-控制器 首先我们需要知道MVC模式并不是javaweb项目中独有的,MVC是一种软件工程中的一种软件架构模式,把软件系统分为三个基本部分:模型(Model).视图(View)和控制器(Controller),即为MVC.它是一种软件设计的典范, 一.MVC与模板概念的理解 MVC本来是存在于Desktop程序中的,M是指数据模型,V是指用户界面,C则是控制器.使用MVC的目的是将M和V的实现代码分离,从而使同一个

Java Web开发模式

一 Java Web开发模式的变迁 1 最初的Java web服务器端编程技术是Servlet,利用Servlet就可以开发出一个Web应用程序. 2 为了解决Servlet缺陷,SUN推出了JSP技术.但是开发人员又走向了另一个极端就是完全放弃了Servlet. 在JSP页面混合使用HTML标记和java代码编写的脚本元素标记来开发Web程序.采用这种方法虽然可以编写JSP页面变得简单,直观,然而,他只适合于业务流程简单,系统规模较小的应用系统. 如果系统较大的话,就会出现两个严重的缺点: 1

mongodb之master/slave模式

### mongodb的主从配置(不带auth认证) ### 注意事项: - 服务器节点之前时间要同步 - 开启防火墙的一定要允许通过相关端口 - 开启selinux的也要进行设置 - 建立双击互信模式最好不过 ### master配置文件 - 添加了master = true 这一行即可 - 多网卡机器bind_ip这一行尽可能写成一个具体地址(最好写内网地址),因为slave上是根据这个同步的 [[email protected] ~]# sed -e '/^#/d;/^$/d' /etc/

mongodb之master/slave模式 + auth

## 主从带认证: 主服务器和从服务器必须开启安全认证:--auth, 主服务器和从服务器的admin数据库中必须有全局用户, 然后主服务器的local数据库和从服务器的local数据均有名为repl且密码相同的用户名. 注:local:本地数据库 这个数据库不会同步,主要存放同步的信息.在MongoDB2.0.2版本测试时,从服务器的admin数据库中没有全局用户时也能进行复制(Deven:我们就是采用这个方式, 从服务器admin数据库没有建立用户),尽管admin中无用户,客户端连接此服务

Java多线程Future模式

Java多线程Future模式有些类似于Ajax的异步请求Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑 假设服务器的处理某个业务,该业务可以分成AB两个过程,并且AB两个过程之间不需要彼此的返回结果 A过程需要1秒钟,B过程需要2秒钟,主线程其他操作2秒钟按照正常编写,程序大概需要执行5秒如果按照Future模式只需要执行2秒(取其中运行时间最久的线程的运行时间) Future模式的核心实现在于两个方面 1.多线程运行 主线程采用多线的方

java 动态代理模式

一.相关类及其方法:java.lang.reflect.Proxy,Proxy 提供用于创建动态代理类和实例的静态方法.newProxyInstance()返回一个指定接口的代理类实例,该接口可以将方法调用指派到指定的调用处理程序(详见api文档) java.lang.reflect.InvocationHandler,InvocationHandler 是代理实例的调用处理程序 实现的接口.invoke()在代理实例上处理方法调用并返回结果.在与方法关联的代理实例上调用方法时,将在调用处理程序