我所理解的并发#2:两种策略

之前的文章说到,运行时,程序=代码+数据。那么并发编程就可以有两种策略,代码并发和数据并发。

代码并发

代码并发的前提是,我们的代码,准确点说应该是计算,是可以被分割的,分割成一小块一小块的计算,而且不互相依赖。抽象示例如下,

class WorkerTask implements Runnable {

    private Data data;
    private SplittedCode code;

    private CountDownLatch latch;

    public WorkerTask(Data data, SplittedCode code, CountDownLatch latch) {
        this.data = data;
        this.code = code;
        this.latch = latch;
    }

    @Override
    public void run() {
        try {
            code.run(data);
        } finally {
            latch.countDown();
        }
    }

}
class LeaderTask implements Runnable {

    private Data data;
    private SplittedCode code;

    public LeaderTask(Data data, SplittedCode code) {
        this.data = data;
        this.code = code;
    }

    @Override
    public void run() {
        SplittedCode[] codes = code.split();
        CountDownLatch latch = new CountDownLatch(codes.length);
        for (SplittedCode code : codes) {
            LWThreadPoolProvider.workerPool().submit(
                    new WorkerTask(data, code, latch));
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();  // TODO
        }
    }

}
public class LWExecutor implements Executor {
    @Override
    public void exec(Code code, Data data) {
        if(code instanceof SplittedCode) {
            LWThreadPoolProvider.leaderPool().submit(
                    new LeaderTask(data, (SplittedCode)code));
        }
    }
}

采用分级线程池,将整体计算提交给到LeaderPoolLeaderPool对计算进行拆分,拆成一个个小的计算提交给WorkerPool。两个线程池采用不同的RejectedExecutionHandler

    public static ExecutorService leaderPool() {
        if(LEADER_POOL == null) {
            LEADER_POOL = new ThreadPoolExecutor(
                    getConfig().getLeaderPoolCoreSize(),
                    getConfig().getLeaderPoolMaxSize(),
                    getConfig().getLeaderThreadKeepAliveSeconds(),
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<Runnable>(getConfig().getLeaderTaskQueueSize()),
                    new ThreadPoolExecutor.AbortPolicy());
        }
        return LEADER_POOL;
    }

    public static ExecutorService workerPool() {
        if(WORKER_POOL == null) {
            WORKER_POOL = new ThreadPoolExecutor(
                    getConfig().getWorkerPoolCoreSize(),
                    getConfig().getWorkerPoolMaxSize(),
                    getConfig().getWorkerThreadKeepAliveSeconds(),
                    TimeUnit.SECONDS,
                    new LinkedBlockingDeque<Runnable>(getConfig().getWorkerTaskQueueSize()),
                    new ThreadPoolExecutor.CallerRunsPolicy());
        }
        return WORKER_POOL;
    }

数据并发

类似的,数据并发就是拆分数据,

class PagedTask implements Runnable {

    private Code code;
    private PagedData data;

    public PagedTask(int taskIndex, int pageSize, Code code, PagedData data) {
        this.code = code;
        this.data = data.subData(taskIndex*pageSize, (taskIndex+1)*pageSize);
    }

    @Override
    public void run() {
        code.run(data);
    }

}
public class PagedExecutor implements Executor {

    private int taskNum;
    private int pageSize;
    private ExecutorService executor;

    public PagedExecutor(int taskNum, int pageSize, int threads) {
        this.taskNum = taskNum;
        this.pageSize = pageSize;
        executor = Executors.newFixedThreadPool(threads);
    }

    @Override
    public void exec(Code code, Data data) {
        if(data instanceof PagedData) {
            for(int taskIndex = 0; taskIndex < taskNum; taskIndex++) {
                PagedTask task = new PagedTask(taskIndex, pageSize,
                        code, (PagedData)data);
                executor.submit(task);
            }
        }
    }
}

将数据拆分,分配给不同的task,每个task有自己的taskIndex,所有task并发执行。



简单了点,轻拍^_^

时间: 2024-08-25 03:48:37

我所理解的并发#2:两种策略的相关文章

java中应对高并发的两种策略

目的:提高可用性 通过ExecutorService实现队列泄洪 //含有20个线程的线程池 private ExecutorService executorService = Executors.newFixedThreadPool(20); 将有并发压力的下游代码放入到线程池的submit方法中,如下: //同步调用线程池的submit方法 简单实现 拥塞窗口为20的队列泄洪(一个服务器同一时间只能处理20个请求,其他请求队列等待)Future<Object> future = execu

4.2分别使用循环和递归两种策略求二项式从c(n,k);

//4.2分别使用循环和递归两种策略求二项式从c(n,k); //以for循环运行c(n,k) #include <iostream> using namespace std; int sum(int m); int main() { int n,k; int sum_n,sum_k,sum_i,sum_n_k; cout<<"请输入C(n,k)中的n值:"<<endl; cin>>n; cout<<"请输入C(n,

Spring声明式事务配置的两种策略SpringAop和Bean后处理器的代理BeanNameAutoProxyCreator

Spring的事务配置有两种:1编程式事务管理配置:2声明式事务管理配置.下面介绍两种声明式事务的配置,声明式事务相比于编程式事务代码耦合更低,无序书写任何事务管理的先关代码.两种声明式事务配置策略分别是:SpringAop事务管理和Bean后处理器的代理BeanNameAutoProxyCreator管理事务. 1.SpringAop事务管理配置 1.1.配置数据源: <bean id="pycDataSource" class="com.mchange.v2.c3p

两种高性能I/O设计模式(Reactor/Proactor)的比较

综述 这篇文章探讨并比较两种用于TCP服务器的高性能设计模式. 除了介绍现有的解决方案,还提出了一种更具伸缩性,只需要维护一份代码并且跨平台的解决方案(含代码示例),以及其在不同平台上的微调. 此文还比较了java.c#.c++对各自现有以及提到的解决方案的实现性能. 系统I/O 可分为阻塞型, 非阻塞同步型以及非阻塞异步型[1.2]. 阻塞型I/O意味着控制权只到调用操作结束了才会回到调用者手里. 结果调用者被阻塞了, 这段时间了做不了任何其它事情. 更郁闷的是,在等待IO结果的时间里,调用者

【转】Reactor与Proactor两种模式区别

转自:http://www.cnblogs.com/cbscan/articles/2107494.html 两种IO多路复用方案:Reactor and Proactor 一般情况下,I/O 复用机制需要事件分享器(event demultiplexor [1, 3]). 事件分享器的作用,即将那些读写事件源分发给各读写事件的处理者,就像送快递的在楼下喊: 谁的什么东西送了, 快来拿吧.开发人员在开始的时候需要在分享器那里注册感兴趣的事件,并提供相应的处理者(event handlers),或

struts2+spring的两种整合方式

借助于Spring插件(Struts2-spring-plugin-XXX.jar),我们可以非常简单地完成Spring和Struts2的整合,这种整合包括让Action自动装配Spring容器中的Bean,以及让Spring管理应用中的Action两种方式,不管采用哪种方式,完成Struts2和Spring的整合都是非常简单的,而且差别不大.一旦在Web应用中安装了Spring插件,即可充分利用该插件提供的功能: 1,可以通过Spring来创建所有的Action,Interceptor和Res

nginx(4)-负载均衡的5种策略及原理

nginx的upstream目前支持的5种方式的分配 1.轮询(默认)每个请求按时间顺序逐一分配到不同的后端服务器,如果后端服务器down掉,能自动剔除. upstream backserver { server 192.168.0.14; server 192.168.0.15; } 2.指定权重指定轮询几率,weight和访问比率成正比,用于后端服务器性能不均的情况. upstream backserver { server 192.168.0.14 weight=8; server 192

《一种策略融合的跨语言文本情感倾向判别方法》论文学习笔记(大一下)

现象:因特网资源呈现多语言化和跨语言的特点,给普通用户获取非母语网络信息造成障碍. 目标:整合多语言倾向信息,以通用的数据形式让用户了解多语言数据对某个对象的评价. 针对跨语言情感倾向分类任务,提出两种跨语言情感倾向分析策略: 半监督框架的跨言情感倾向判别方法(双语协同文本情感倾向判别框架) 关键:跨语言一致文本 材料:源语言数据集和目标语言数据集 目标:利用源语言数据集的情感倾向标签,预测目标语言数据集中未标注样本点的情感倾向标签,即学习跨语言函数. 方法:将情感倾向一致样本点作为载体,采用半

[AngularJS面面观] 12. scope中的watch机制---第三种策略$watchCollection

如果你刚刚入门angular,你或许还在惊叹于angular的双向绑定是多么的方便,你也许在庆幸未来的前端代码中再也不会出现那么多繁琐的DOM操作了. 但是,一旦你的应用程序随着业务的复杂而复杂,你就会发现你手头的那些angular的知识似乎开始不够用了.为什么绑定的数据没有生效?为什么应用的速度越来越慢?为什么会出现莫名其妙的infinite digest异常?所以你开始尝试进阶,尝试弄清楚在数据绑定这个现象后面到底发生了什么. 相信能顺着前面数十篇文章看到这里的同学们,一定对angular是