java并发编程之Master-Worker模式

Master-Worker模式适合在一个任务可以拆分成多个小任务来进行的情况下使用。

package cn.fcl.masterworker;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public class Master {
private Queue queue = new ConcurrentLinkedQueue();
private Map<String, Thread> threadMap = new HashMap<String, Thread>();
private Map<Object, Object> result = new ConcurrentHashMap<Object, Object>();
public Master(Worker worker, int count) {
worker.setQueue(queue);
worker.setResult(result);
for(int i = 0; i < count; i++) {
threadMap.put(String.valueOf(i), new Thread(worker));
}
}
public void submit(Object obj) {
queue.add(obj);
}
public void execute() {
for(Map.Entry<String, Thread> thread : threadMap.entrySet()) {
thread.getValue().start();
}
}
public Map<Object, Object> getResult() {
return result;
}
public void setResult(Map<Object, Object> result) {
this.result = result;
}
public boolean isComplete() {
for(Map.Entry<String, Thread> thread : threadMap.entrySet()) {
if(thread.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
}
package cn.fcl.masterworker;
import java.util.Map;
import java.util.Queue;
public abstract class Worker implements Runnable {
private Queue queue;
private Map<Object, Object> result;
public void run() {
while(true) {
Object obj = queue.poll();
if(obj == null) {
break;
}
result.put(obj, handle(obj));
}
}
public abstract Object handle(Object obj);
public Queue getQueue() {
return queue;
}
public Map<Object, Object> getResult() {
return result;
}
public void setQueue(Queue queue) {
this.queue = queue;
}
public void setResult(Map<Object, Object> result) {
this.result = result;
}
}
package cn.fcl.masterworker;
public class PlusWorker extends Worker{
@Override
public Object handle(Object obj) {
Integer value = (Integer) obj;
return value * value * value;
}
}
package cn.fcl.masterworker;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
public class main {
public static void main(String[] args) {
Master master = new Master(new PlusWorker(), 5);
for(int i = 0; i < 100; i++) {
master.submit(i);
}
master.execute();
Map<Object, Object> result = master.getResult();
int re = 0;
while(result.size() > 0 || !master.isComplete()) {
Set<Object> sets = result.keySet();
Object key = null;
for(Object o : sets) {
key = o;
break;
}
if(key == null) {
continue;
}
re += Integer.parseInt(result.get(key).toString());
result.remove(key);
}
System.out.println(re);
}
}

java并发编程之Master-Worker模式

时间: 2024-08-24 19:42:35

java并发编程之Master-Worker模式的相关文章

java并发编程之future模式

1.当你想并发去执行一段代码,但是还想获取这段代码的返回结果,那么future多线程模式就可以派上用场了,代码实现如下. public class Client { public Data request() { final FutureData futureData = new FutureData(); new Thread(new Runnable() { @Override public void run() { futureData.setRealData(new RealData()

java并发编程之Guarded Suspention

当客户端请求速度远远大于服务端的处理速度,这时候就非常适合使用Guarded Suspention模式 package cn.fcl.guardendSuspension; import java.util.ArrayList; import java.util.List; public class RequestQueue { private List<Integer> integers = new ArrayList<Integer>(); public synchronize

Java并发编程之volatile的理解

Java并发编程之volatile关键字的理解 Java中每个线程都有自己的工作内存,类比于处理器的缓存,线程的工作内存中保存了被该线程使用到的变量的主内存的拷贝.线程读写变量都是直接在自己的工作内存中进行的,而何时刷新数据(指将修改的结果更新到主存或者把主存的变量读取覆盖掉工作内存中的值)是不确定的. volatile关键字是修饰字段的关键字,貌似是JDK1.5之后才有的,在多线程编程中,很大的几率会用到这个关键字,volatile修饰变量后该变量有这么一种效果:线程每一次读该变量都是直接从主

Java并发编程之ConcurrentHashMap

ConcurrentHashMap ConcurrentHashMap是一个线程安全的Hash Table,它的主要功能是提供了一组和HashTable功能相同但是线程安全的方法.ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁. ConcurrentHashMap的内部结构 ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment

Java并发编程之set集合的线程安全类你知道吗

Java并发编程之-set集合的线程安全类 Java中set集合怎么保证线程安全,这种方式你知道吗? 在Java中set集合是 本篇是<凯哥(凯哥Java:kagejava)并发编程学习>系列之<并发集合系列>教程的第二篇: 本文主要内容:Set集合子类底层分别是什么?基于底层为什么set的子类可以存放一个数据?怎么解决set线程安全问题? 一:Set集合子类 Set的三个子类分别是:HaseSet.TreeSet.LinkedHashSet.这三个都是线程不安全的.那么这三个子类

Java并发编程之Condition

1.使用synchronized中的等待和唤醒实现消费者和生产者模式 /** * 使用Synchronized实现消费者生产者模式 */ public class SynchronizedDemo { static List<Integer> list = new ArrayList<Integer>(); private static int maxNum = 5; // 消费者 private void Consumer(String name){ synchronized (

java并发编程之volatile关键字

1.volatile的作用 一个线程共享变量(类的成员变量.类的静态成员变量等)被volatile修饰之后,就具有以下作用: 1)并发中的变量可见性(不同线程对该变量进行操作时的可见性),即一个线程修改了某个变量的值,则该新值对其他线程立即可见(可立即访问新值/立即强制写入主存): 2)禁止指令重排(包括java编译器和CPU运行时指令重排序): 3)禁用缓存(java虚拟机规范)---子线程的工作内存(包括了CPU缓存). 2.相关概念 2.1)指令重排序: (1)java编译器运行时指令重排

Java并发编程之Phaser类

Phaser这个类的使用场景为N个线程分阶段并行的问题.有这么一个任务为"做3道题",每个学生一个进程,5个学生可以并行做,这个就是常规的并发,但是如果加一个额外的 限制条件,必须等所有人都做完类第一题,才能开始做第二题,必须等所有人都做完了第二题,才能做第三题,这个问题就转变成了分阶段并发的问题,最适合用Phaser来解题,下面给出源代码,大家可以自己尝试: MyPhaser.java import java.util.concurrent.Phaser; public class

Java并发编程之ConcurrentHashMap原理分析

前言: 集合是编程中最常用的数据结构.而谈到并发,几乎总是离不开集合这类高级数据结构的支持.比如两个线程需要同时访问一个中间临界区(Queue),比如常会用缓存作为外部文件的副本(HashMap).这篇文章主要分析jdk1.5的3种并发集合类型(concurrent,copyonright,queue)中的ConcurrentHashMap,让我们从原理上细致的了解它们,能够让我们在深度项目开发中获益非浅. 在tiger之前,我们使用得最多的数据结构之一就是HashMap和Hashtable.大