Master和worker模式

让和hadoop的设计思想是一样的,Master负责分配任务和获取任务的结果,worker是真正处理业务逻辑的。

使用ConcurrentLikedQueue去承载所有的任务,因为会有多个worker会并发修改这个队列。

public class Task {

    private int id;
    private int price ;
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public int getPrice() {
        return price;
    }
    public void setPrice(int price) {
        this.price = price;
    } 

}
public class Master {

    //1 有一个盛放任务的容器
    private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();

    //2 需要有一个盛放worker的集合
    private HashMap<String, Thread> workers = new HashMap<String, Thread>();

    //3 需要有一个盛放每一个worker执行任务的结果集合
    private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

    //4 构造方法
    public Master(Worker worker , int workerCount){
        worker.setWorkQueue(this.workQueue);
        worker.setResultMap(this.resultMap);

        for(int i = 0; i < workerCount; i ++){
            this.workers.put(Integer.toString(i), new Thread(worker));
        }

    }

    //5 需要一个提交任务的方法
    public void submit(Task task){
        this.workQueue.add(task);
    }

    //6 需要有一个执行的方法,启动所有的worker方法去执行任务
    public void execute(){
        for(Map.Entry<String, Thread> me : workers.entrySet()){
            me.getValue().start();
        }
    }

    //7 判断是否运行结束的方法
    public boolean isComplete() {
        for(Map.Entry<String, Thread> me : workers.entrySet()){
            if(me.getValue().getState() != Thread.State.TERMINATED){
                return false;
            }
        }
        return true;
    }

    //8 计算结果方法
    public int getResult() {
        int priceResult = 0;
        for(Map.Entry<String, Object> me : resultMap.entrySet()){
            priceResult += (Integer)me.getValue();
        }
        return priceResult;
    }
}
public class Worker implements Runnable {

    private ConcurrentLinkedQueue<Task> workQueue;
    private ConcurrentHashMap<String, Object> resultMap;

    public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
        this.workQueue = workQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    @Override
    public void run() {
        while(true){
            Task input = this.workQueue.poll();
            if(input == null) break;
            Object output = handle(input);
            this.resultMap.put(Integer.toString(input.getId()), output);
        }
    }

    private Object handle(Task input) {
        Object output = null;
        try {
            //处理任务的耗时。。 比如说进行操作数据库。。。
            Thread.sleep(500);
            output = input.getPrice();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return output;
    }

}
public class Main {

    public static void main(String[] args) {

        Master master = new Master(new Worker(), 20);

        Random r = new Random();
        for(int i = 1; i <= 100; i++){
            Task t = new Task();
            t.setId(i);
            t.setPrice(r.nextInt(1000));
            master.submit(t);
        }
        master.execute();
        long start = System.currentTimeMillis();

        while(true){
            if(master.isComplete()){
                long end = System.currentTimeMillis() - start;
                int priceResult = master.getResult();
                System.out.println("最终结果:" + priceResult + ", 执行时间:" + end);
                break;
            }
        }

    }
}
时间: 2024-10-06 11:56:02

Master和worker模式的相关文章

nginx源码分析--master和worker进程模型

一.Nginx整体架构 正常执行中的nginx会有多个进程,最基本的有master process(监控进程,也叫做主进程)和woker process(工作进程),还可能有cache相关进程. 一个较为完整的整体框架结构如图所示: 二.核心进程模型 启动nginx的主进程将充当监控进程,而由主进程fork()出来的子进程则充当工作进程. nginx也可以单进程模型执行,在这种进程模型下,主进程就是工作进程,没有监控进程. Nginx的核心进程模型框图如下: master进程 监控进程充当整个进

Spark技术内幕:Client,Master和Worker 通信源码解析

Spark的Cluster Manager可以有几种部署模式: Standlone Mesos YARN EC2 Local 在向集群提交计算任务后,系统的运算模型就是Driver Program定义的SparkContext向APP Master提交,有APP Master进行计算资源的调度并最终完成计算.具体阐述可以阅读<Spark:大数据的电花火石!>. 那么Standalone模式下,Client,Master和Worker是如何进行通信,注册并开启服务的呢? 1. node之间的IP

Spark技术内幕:Client,Master和Worker 通信源代码解析

Spark的Cluster Manager能够有几种部署模式: Standlone Mesos YARN EC2 Local 在向集群提交计算任务后,系统的运算模型就是Driver Program定义的SparkContext向APP Master提交,有APP Master进行计算资源的调度并终于完毕计算.具体阐述能够阅读<Spark:大数据的电花火石!>. 那么Standalone模式下,Client.Master和Worker是怎样进行通信,注冊并开启服务的呢? 1. node之间的RP

spark 源码理解2 进一步窥探Master、Worker通信机制

上一篇文章 spark 源码理解1 从spark启动脚本开始 是分析执行start_all.sh时,集群中启动了哪些进程,下面我们再深入一点看看这些进程都是做什么用的,它们之间又是如何通信的? 一.Master进程的启动 Master进程,它主要负责对Worker.Driver.App等资源的管理并与它们进行通信,这篇文章中我打算着重讲一下它与Worker的通信,其它的部分放在以后的章节再加以描述. spark-daemon.sh start org.apache.spark.deploy.ma

yum安装apache后更改worker模式

进入linux命令模式,输入 #yum install httpd                    //安装apache #vi  /etc/sysconfig/httpd            //修改文件,打开worker模式 将#号去掉 重启apache,命令如下 #service httpd restart 报错如下: [root@localhost ~]# service httpd restartStopping httpd:                          

Apache的prefork模式和worker模式

prefork模式这个多路处理模块(MPM)实现了一个非线程型的.预派生的web服务器,它的工作方式类似于Apache 1.3.它适合于没有线程安全库,需要避免线程兼容性问题的系统.它是要求将每个请求相互独立的情况下最好的MPM,这样若一个请求出现问题就不会影响到其他请求. 这个MPM具有很强的自我调节能力,只需要很少的配置指令调整.最重要的是将MaxClients设置为一个足够大的数值以处理潜在的请求高峰,同时又不能太大,以致需要使用的内存超出物理内存的大小. worker模式此多路处理模块(

Spark分析之Master、Worker以及Application三者之间如何建立连接

Master.preStart(){ webUi.bind() context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) //定时任务检测是否有DEAD WORKER需要移除 case CheckForWorkerTimeOut => { timeOutDeadWorkers() } /** Check for, and remove, any timed-out

Eureka 系列(06)消息广播(下):TaskDispacher 之 Acceptor - Worker 模式

Eureka 系列(06)消息广播(下):TaskDispacher 之 Acceptor - Worker 模式 [TOC] 0. Spring Cloud 系列目录 - Eureka 篇 Eureka 消息广播主要分三部分讲解: 服务器列表管理:PeerEurekaNodes 管理了所有的 PeerEurekaNode 节点. 消息广播机制分析:PeerAwareInstanceRegistryImpl 收到客户端的消息后,第一步:先更新本地注册信息:第二步:遍历所有的 PeerEureka

Spark的Master和Worker集群启动的源码分析

基于spark1.3.1的源码进行分析 spark master启动源码分析 1.在start-master.sh调用master的main方法,main方法调用 def main(argStrings: Array[String]) { SignalLogger.register(log) val conf = new SparkConf val args = new MasterArguments(argStrings, conf) val (actorSystem, _, _, _) =