Tiny并行计算框架之实现机理

这一篇呢,主要介绍其实现机理。  当然,秉承偶的一向的观点,让新手也能看得懂。 
首先看工作的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public interface Work extends Serializable {
    /**
     * 返回工作类型,每个工作都有一个工作类型,包工头及工人只能处理同样类型的工作
     *
     * @return
     */
    String getType();

String getId();

/**
     * 返回后续步骤的工作,如果有,说明是复合工作,如果没有,说明是简单工作
     *
     * @return
     */
    Work getNextWork();

/**
     * 设置后续步骤工作
     *
     * @param nextWork 后续工作
     * @return 返回后续工作
     */
    Work setNextWork(Work nextWork);

/**
     * 是否需要序列化
     *
     * @return true表示工作永不丢失,false表示容器关闭即丢失
     */
    boolean isNeedSerialize();

/**
     * 设置是否需要进行序列化,如果要用到MQ,则需要设置为需要序列化
     *
     * @param needSerialize true表示工作永不丢失,false表示容器关闭即丢失
     */
    void setNeedSerialize(boolean needSerialize);

/**
     * 返回输入仓库
     *
     * @return
     */
    Warehouse getInputWarehouse();

/**
     * 设置输入仓库
     *
     * @param inputWarehouse
     */
    void setInputWarehouse(Warehouse inputWarehouse);

/**
     * 设置工作状态
     *
     * @param workStatus
     */
    void setWorkStatus(WorkStatus workStatus);

/**
     * 获取工作状态
     *
     * @return
     */
    WorkStatus getWorkStatus();
}

是不是很简单,它的实现也是同样简单:

1
2
3
4
5
6
7
8
public class WorkDefault implements Work {
    private String id;
    private String type;
    private Warehouse inputWarehouse;
    private boolean needSerialize = false;
    private Work nextStepWork = null;
    private WorkStatus workStatus = WorkStatus.WAITING;
}

基本上就是上述属性的set,get方法。 
工人的接口如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 工人,用于干具体的工作
* Created by luoguo on 14-1-8.
*/
public interface Worker extends ParallelObject {
    /**
     * 执行工作
     *
     * @return
     */
    Warehouse work(Work work) throws RemoteException;

/**
     * 是否接受工作
     * 即使是同样类型的工人,有可能对工作也挑三捡四,这里给了工人一定的灵活性
     *
     * @param work
     * @return true表示接受,false表示不接受
     */
    boolean acceptWork(Work work) throws RemoteException;

/**
     * 返回类型
     *
     * @return
     */
    String getType() throws RemoteException;

}

是不是也很简单? 
下面看看工头:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 包工头
* 包工头用于带着一组工人并完成对应的任务
* Created by luoguo on 14-1-8.
*/
public interface Foreman extends ParallelObject {
    /**
     * 返回执行哪种类型的工作任务
     *
     * @return
     */
    String getType() throws RemoteException;

/**
     * 开始干活以完成工作
     */
    Warehouse work(Work work, List<Worker> workerList) throws IOException;

/**
     * 设置工作合并器
     *
     * @param workCombiner
     */
    void setWorkCombiner(WorkCombiner workCombiner);

/**
     * 设置工作分解器
     *
     * @param workSplitter
     */
    void setWorkSplitter(WorkSplitter workSplitter);

}

下面看看职业介绍所,呵呵,这个就复杂些了(方法多了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/**
* 职介所
* 职介所是分布式处理的核心场所,所有工作相关的元素都要通过职介所进行关联
* Created by luoguo on 14-1-8.
*/
public interface JobCenter {
    String WORK_QUEUE = "WorkQueue";
    String FOREMAN = "Foreman";
    String WORKER = "Worker";

RmiServer getRmiServer();

void setRmiServer(RmiServer rmiServer);

/**
     * 注册工人
     *
     * @param worker
     */
    void registerWorker(Worker worker) throws RemoteException;

/**
     * 返回工作队列对象
     *
     * @return
     */
    WorkQueue getWorkQueue();

/**
     * 注消工人
     *
     * @param worker
     */
    void unregisterWorker(Worker worker) throws RemoteException;

/**
     * 注册一份工作,工作情况不需要马上关注。因此也就不用等待,马上返回可以进行其它处理
     * 如果有返回结果,可以通过异步方式,异步方式可以用后续工作的方式来指定
     *
     * @param work
     */
    void registerWork(Work work) throws IOException;

/**
     * 取消工作,在工作没有分配出去之前,可以从职介所注消工作,如果工作已经分配出去,则无法注消
     *
     * @param work
     */
    void unregisterWork(Work work) throws RemoteException;

/**
     * 返回指定工作的工作状态
     *
     * @param work
     * @return
     */
    WorkStatus getWorkStatus(Work work) throws RemoteException;

/**
     * 执行一项工作,期望同步得到结果或异常
     * 如果没有合适的工人或包工头进行处理,马上会抛出异常
     *
     * @param work
     */
    Warehouse doWork(Work work) throws IOException;

/**
     * 注册包工头
     *
     * @param foreman
     */
    void registerForeman(Foreman foreman) throws RemoteException;

/**
     * 注销包工头
     *
     * @param foreman
     */
    void unregisterForeMan(Foreman foreman) throws RemoteException;

/**
     * 返回具有某种类型的空闲且愿意接受工作的工人列表
     *
     * @return
     */
    List<Worker> getIdleWorkerList(Work work);

/**
     * 返回所有的工作列表
     *
     * @return
     */
    List<Work> getWorkList() throws RemoteException;

/**
     * 返回某种类型的某种状态的工作列表
     *
     * @return
     */
    List<Work> getWorkList(String type, WorkStatus workStatus) throws RemoteException;

/**
     * 返回组织某种工作的的空闲工头列表
     *
     * @param type
     * @return
     */
    List<Foreman> getIdleForeman(String type);

/**
     * 自动进行匹配,如果有匹配成功的,则予以触发执行
     */
    void autoMatch() throws IOException;

/**
     * 职业介绍所关门
     *
     * @throws RemoteException
     */
    void stop() throws RemoteException;
}

讲过了四个重要接口,现在说说实现思路:   工人、包工头都是无状态的。这有个好处是不管来多少工作,都可以进行处理;缺点是没有办法进行后续干预。 
综合来说,我还是觉得无状态的比有状态的更好。 
因为一开始我的实现是有状态的,甚至可以让包工头取消一个工作,包工头再让工人取消工作,但是这样会带来异常复杂的分布式状态维护。但是改成用无状态的模式,就方便多了。 
在职业介绍所,有两个重要的方法,一个是doWork,即立即执行一个工作,如果找不到合适的工头和工人,就会抛出异常;另外一个是autoMatch触发职业介绍所进行一次自动匹配。之所以没有做在职业介绍所内部开启线程是为了给外部提供更方便的控制。 
所以,其总体设计思想就是开个职业介绍所,工人或工头、工作就可以注册进来,由职业介绍所来撮合成合适的虚拟小组来达成任务。是不是很容易理解? 
下面就是所有的接口与实现类了: 

155218_thit_1245989.jpg (16.92 KB, 下载次数: 0)

下载附件

2015-5-27 22:14 上传

对于做并行开发的人员来说: 
职业介绍所,工人,工头都不用开发,框架自带的已经足够用了。开发人员只要开发工人和工作分解合并器即可。 
工人继承AbstractWorker之后,只有一个方法实现即可。工作分解一个方法,工作合并一个方法,其它的都交给Tiny并行计算框架吧。 
总结: 
此框架对并行计算的参与者进行了分解,分解为职业介绍所、工头、工人及工作,职业介绍所是中心,职业介绍所停止运行,将无法进行并行计算。工头、工人是可以在任意计算机的,工作是可以在任意节点添加的,由于框架提供了工作序列化功能,因此只要设置工作需要序列化是true,此工作将一直存在,直到被完成,利用此特性可以方便的实现简单的MQ。同时为方便业务实现,工人有抽象类,建议直接继承抽象类实现要关业务方法即可。 
注意:目前在执行过程中工人注销或停止服务,会导致整个工作执行失败,下次继续进行执行,后续拟改成考虑用其它工人继续工作。 
今天比较忙,写得比较匆忙,如果有不清楚的可以在下面提问。

时间: 2024-08-28 20:26:26

Tiny并行计算框架之实现机理的相关文章

Tiny并行计算框架之复杂示例

问题来源  非常感谢@doctorwho的问题: 假如职业介绍所来了一批生产汽车的工作,假设生产一辆汽车任务是这样的:搭好底盘.拧4个轮胎.安装发动机.安装4个座椅.再装4个车门.最后安装顶棚.之间有的任务是可以并行计算的(比如拧4个轮胎,安装发动机和安装座椅),有的任务有前置任务(比如先装好座椅,才能装车门和顶棚).让两组包工头组织两种类型的工作:将工人分成两种类型,即可并行计算的放在同一组内,由职业介绍所来控制A组包工头做完的任务交给B组包工头.中间环节的半成品保存到Warehouse中,是

Tiny并行计算框架之使用介绍Tiny并行计算框架之使用介绍

呵呵,昨天看到两新粉,一激动,就想着今天来写这篇文章.  其实一直在关注这个领域,但是一直没有信心来写,所以一直期望着有一个开源的来用. 看到了彭渊大师的淘宝分布式框架Fourinone介绍,确实有一种相见恨晚的感觉,于是就准备去研究一番,详细见本人的感想文章由fourinone初步学习想到的,确实来说,感觉到有一种啃不动的感觉,当然也可能是本人水平不足的原因所致.但是不管怎么说,促动了本人来写一个简单的并行计算框架. 在此引用本人的名言:“牛人的代码就是生手也一看就懂:生手的代码就是牛人来了也

【开源专访】Fourinone创始人彭渊:打造多合一的分布式并行计算框架

摘要:Fourinone是一个分布式并行计算框架,以轻量的方式提供了一个四合一的分布式框架功能以及简单易用的API,通过实现对多台计算机资源的统一利用,来获得强大的计算能力.本期我们采访了项目创始人彭渊. Fourinone(即Four-in-one,中文名字“四不像”)是一个分布式计算框架,提供了一个4合1的分布式框架功能(即整合了Hadoop.Zookeeper.MQ.分布式缓存的主要功能)和简单易用的编程API,实现对多台计算机CPU.内存.硬盘的统一利用,从而获取到强大计算能力去解决复杂

Fork/Join-Java并行计算框架

Java在JDK7之后加入了并行计算的框架Fork/Join,可以解决我们系统中大数据计算的性能问题.Fork/Join采用的是分治法,Fork是将一个大任务拆分成若干个子任务,子任务分别去计算,而Join是获取到子任务的计算结果,然后合并,这个是递归的过程.子任务被分配到不同的核上执行时,效率最高.伪代码如下: Result solve(Problem problem) { if (problem is small) directly solve problem else { split pr

高清视频:《公开课2》Tiny开源框架服务及CEP深入剖析

在Java开源框架中,如何定义Service规范?如何通过服务中心提供远程服务调用?如何根据用户的服务自动生成相关服务包装类?Java开源框架的服务体系是如何运行的?来吧!让我们一起来聆听<Tiny公开课2:Tiny服务及CEP深入剖析>! 高清视频下载地址:http://bbs.tinygroup.org/thread-1429-1-1.html 版权声明:本文为博主原创文章,未经博主允许不得转载.

jdk7 并行计算框架Fork/Join

故名思义,拆分fork+合并join.jdk1.7整合Fork/Join,性能上有大大提升. 思想:充分利用多核CPU把计算拆分成多个子任务,并行计算,提高CPU利用率大大减少运算时间.有点像,MapReduce思路感觉大致一样. jdk7中已经提供了最简洁的接口,让你不需要太多时间关心并行时线程的通信,死锁问题,线程同步,下面是它提供的接口: 简单示例: package tank.forjoin.demo; import java.util.concurrent.RecursiveTask;

【Tiny前端框架】最新开源,免费原创最新!

特点: 组件丰富 兼容性好 修改容易 扩展方便 性能能好 内存回收强 界面好看 免费分享 更换皮肤 支持窗口小组件 流式布局 osc上不允许上传附件... 没办法,只有放到 csdn上了. 下载地址:http://download.csdn.net/detail/qq_23835005/8188279

转: 并行计算模型和框架

from:  http://www.ibm.com/developerworks/cn/analytics/library/ba-1507-mapreducefiveframes/index.html 并行计算模型和框架 目前开源社区有许多并行计算模型和框架可供选择,按照实现方式.运行机制.依附的产品生态圈等可以被划分为几个类型,每个类型各有优缺点,如果能够对各类型的并行计算框架都进行深入研究及适当的缺点修复,就可以为不同硬件环境下的海量数据分析需求提供不同的软件层面的解决方案. 并行计算框架

异步并行批处理框架设计的一些思考(转)

随着互联网信息技术日新月异的发展,一个海量数据爆炸的时代已经到来.如何有效地处理.分析这些海量的数据资源,成为各大技术厂商争在激烈的竞争中脱颖而出的一个利器.可以说,如果不能很好的快速处理分析这些海量的数据资源,将很快被市场无情地所淘汰.当然,处理分析这些海量数据目前可以借鉴的方案有很多:首先,在分布式计算方面有Hadoop里面的MapReduce并行计算框架,它主要针对的是离线的数据挖掘分析.此外还有针对实时在线流式数据处理方面的,同样也是分布式的计算框架Storm,也能很好的满足数据实时性分