Master-Worker模式简介
Master-Worker模式是非常经典的常用的一个并行计算模式,它的核心思想是2类进程协作工作:Master进程和Worker进程。Master负责接收客户端请求,分配任务;Worker负责具体处理任务。当各个Worker处理完任务后,统一将结果返回给Master,由Master进行整理和总结。其好处是能够将一个大JOB分解成若干小JOB,并行执行,从而提高系统的吞吐量。比如流行的Web Server,如Nginx,Apache HTTP都存在这种Master-Worker工作模式;离线分布式计算框架Hadoop的JobTracker和TaskTracker,实时流计算框架Strom的Nimbus和Supervisor都涉及到这种思想。那么下面我们来具体分析下Java Master-Worker模式的实现。
Master-Worker模式分析
我们重点分析下Master,Worker这2个角色。
Master
Master需要接受Client端提交过来的任务Task,而且还得将Task分配给Worker进行处理,因此Master需要一个存储来存放Task。那么采用哪种存储集合呢?首先来说,需要支持并发的集合类,因为多个Worker间可能存在任务竞争,因此我们需要考虑java.util.concurrent包下的集合。这里可以考虑采用非阻塞的ConcurrentLinkedQueue。
Master需要清楚的知道各个Woker的基本信息,如是否各个Worker都运行完毕,因此Master端需要保存Worker的信息,可以采用Map存储。
由于最后各个Worker都会上报运行结果,Master端需要有一个存储结果的Map,可以采用支持并发的ConcurrentHashMap。
Worker
Worker需要持有Master端的任务Task集合的引用,因为Worker需要从里面拿取Task。
同上,Worker需要持有Master端的存储结果的引用。
综上,我们可以得到如下:
我们可以进一步细化,Master/Worker应该提供什么操作?
Master:
- 通过构造方法以初始化workers
- 应该提供submit(Task)方法接受Client端提交过来的任务
- start()让workers开始处理任务
- 提供isComplete()判断各个worker的状态,是否都处理完毕
- 提供getResult()给客户端返回结果
Worker:
- Worker本质上就是Runnable,提供run()
- 负责处理业务逻辑的handle()
Java Master-Worker代码实现
Task
public class Task { private long id; private String name; public Task(long id, String name) { this.id = id; this.name = name; } public long getId() { return id; } public void setId(long id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }
Worker
public class Worker implements Runnable { private long id; private String name; private ConcurrentLinkedQueue<Task> workQueue; private ConcurrentHashMap<Long,Object> results; public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) { this.workQueue = workQueue; } public void setResults(ConcurrentHashMap<Long, Object> results) { this.results = results; } public Worker(long id, String name) { this.id = id; this.name = name; } @Override public void run() { while(true){ Task task = workQueue.poll(); if(task == null){ break; } long start = System.currentTimeMillis(); long result = handle(task); this.results.put(task.getId(),result); System.out.println(this.name + " handle " + task.getName() + " success . result is " + result + " cost time : " + (System.currentTimeMillis() - start)); } } /** * 负责处理具体业务逻辑 * @param task * @return */ private long handle(Task task) { //这里只是模拟下,在真实环境也许是查询数据库,也许是查缓存等 try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } return new Random().nextLong(); } }
Master
public class Master { private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); private Map<Long,Thread> workers = new HashMap<Long, Thread>(); private ConcurrentHashMap<Long,Object> results = new ConcurrentHashMap<Long, Object>(); public Master(int num){ for(int i = 0 ; i < num ; i++){ Worker worker = new Worker(i,"worker-" + i); worker.setResults(results); worker.setWorkQueue(workQueue); workers.put(Long.valueOf(i),new Thread(worker)); } } public void submit(Task task){ workQueue.add(task); } public void start(){ for (Map.Entry<Long,Thread> entry : workers.entrySet()){ entry.getValue().start(); } } public boolean isComlepte(){ for(Map.Entry<Long,Thread> entry : workers.entrySet()){ if(entry.getValue().getState() != Thread.State.TERMINATED){ return false; } } return true; } public long getSumResult(){ long value = 0; for(Map.Entry<Long,Object> entry : results.entrySet()){ value = value + (Long)entry.getValue(); } return value; } }
Main
public class Main { public static void main(String[] args) { Master master = new Master(10); for(int i = 0 ; i < 10 ; i++){ Task task = new Task(i,"task-" + i); master.submit(task); } long start = System.currentTimeMillis(); master.start(); while(true){ if(master.isComlepte()){ System.out.println("sum result is " + master.getSumResult() + " . cost time : " + (System.currentTimeMillis() - start)); break; } } } }
运行结果
总结
在单线程的时候,处理一个Task需要500ms,那么处理10个Task需要5S,如果采用Master-Worker这种并行模型,可以大大缩短计算处理时间。