之前的文章说到,运行时,程序=代码+数据。那么并发编程就可以有两种策略,代码并发和数据并发。
代码并发
代码并发的前提是,我们的代码,准确点说应该是计算,是可以被分割的,分割成一小块一小块的计算,而且不互相依赖。抽象示例如下,
class WorkerTask implements Runnable {
private Data data;
private SplittedCode code;
private CountDownLatch latch;
public WorkerTask(Data data, SplittedCode code, CountDownLatch latch) {
this.data = data;
this.code = code;
this.latch = latch;
}
@Override
public void run() {
try {
code.run(data);
} finally {
latch.countDown();
}
}
}
class LeaderTask implements Runnable {
private Data data;
private SplittedCode code;
public LeaderTask(Data data, SplittedCode code) {
this.data = data;
this.code = code;
}
@Override
public void run() {
SplittedCode[] codes = code.split();
CountDownLatch latch = new CountDownLatch(codes.length);
for (SplittedCode code : codes) {
LWThreadPoolProvider.workerPool().submit(
new WorkerTask(data, code, latch));
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace(); // TODO
}
}
}
public class LWExecutor implements Executor {
@Override
public void exec(Code code, Data data) {
if(code instanceof SplittedCode) {
LWThreadPoolProvider.leaderPool().submit(
new LeaderTask(data, (SplittedCode)code));
}
}
}
采用分级线程池,将整体计算提交给到LeaderPool
,LeaderPool
对计算进行拆分,拆成一个个小的计算提交给WorkerPool
。两个线程池采用不同的RejectedExecutionHandler
。
public static ExecutorService leaderPool() {
if(LEADER_POOL == null) {
LEADER_POOL = new ThreadPoolExecutor(
getConfig().getLeaderPoolCoreSize(),
getConfig().getLeaderPoolMaxSize(),
getConfig().getLeaderThreadKeepAliveSeconds(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(getConfig().getLeaderTaskQueueSize()),
new ThreadPoolExecutor.AbortPolicy());
}
return LEADER_POOL;
}
public static ExecutorService workerPool() {
if(WORKER_POOL == null) {
WORKER_POOL = new ThreadPoolExecutor(
getConfig().getWorkerPoolCoreSize(),
getConfig().getWorkerPoolMaxSize(),
getConfig().getWorkerThreadKeepAliveSeconds(),
TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>(getConfig().getWorkerTaskQueueSize()),
new ThreadPoolExecutor.CallerRunsPolicy());
}
return WORKER_POOL;
}
数据并发
类似的,数据并发就是拆分数据,
class PagedTask implements Runnable {
private Code code;
private PagedData data;
public PagedTask(int taskIndex, int pageSize, Code code, PagedData data) {
this.code = code;
this.data = data.subData(taskIndex*pageSize, (taskIndex+1)*pageSize);
}
@Override
public void run() {
code.run(data);
}
}
public class PagedExecutor implements Executor {
private int taskNum;
private int pageSize;
private ExecutorService executor;
public PagedExecutor(int taskNum, int pageSize, int threads) {
this.taskNum = taskNum;
this.pageSize = pageSize;
executor = Executors.newFixedThreadPool(threads);
}
@Override
public void exec(Code code, Data data) {
if(data instanceof PagedData) {
for(int taskIndex = 0; taskIndex < taskNum; taskIndex++) {
PagedTask task = new PagedTask(taskIndex, pageSize,
code, (PagedData)data);
executor.submit(task);
}
}
}
}
将数据拆分,分配给不同的task,每个task有自己的taskIndex
,所有task并发执行。
简单了点,轻拍^_^
时间: 2024-11-17 23:25:26