这一篇呢,主要介绍其实现机理。 当然,秉承偶的一向的观点,让新手也能看得懂。
首先看工作的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
public interface Work extends Serializable { /** * 返回工作类型,每个工作都有一个工作类型,包工头及工人只能处理同样类型的工作 * * @return */ String getType(); String getId(); /** /** /** /** /** /** /** /** |
是不是很简单,它的实现也是同样简单:
1 2 3 4 5 6 7 8 |
public class WorkDefault implements Work { private String id; private String type; private Warehouse inputWarehouse; private boolean needSerialize = false; private Work nextStepWork = null; private WorkStatus workStatus = WorkStatus.WAITING; } |
基本上就是上述属性的set,get方法。
工人的接口如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
/** * 工人,用于干具体的工作 * Created by luoguo on 14-1-8. */ public interface Worker extends ParallelObject { /** * 执行工作 * * @return */ Warehouse work(Work work) throws RemoteException; /** /** } |
是不是也很简单?
下面看看工头:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
/** * 包工头 * 包工头用于带着一组工人并完成对应的任务 * Created by luoguo on 14-1-8. */ public interface Foreman extends ParallelObject { /** * 返回执行哪种类型的工作任务 * * @return */ String getType() throws RemoteException; /** /** /** } |
下面看看职业介绍所,呵呵,这个就复杂些了(方法多了)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 |
/** * 职介所 * 职介所是分布式处理的核心场所,所有工作相关的元素都要通过职介所进行关联 * Created by luoguo on 14-1-8. */ public interface JobCenter { String WORK_QUEUE = "WorkQueue"; String FOREMAN = "Foreman"; String WORKER = "Worker"; RmiServer getRmiServer(); void setRmiServer(RmiServer rmiServer); /** /** /** /** /** /** /** /** /** /** /** /** /** /** /** |
讲过了四个重要接口,现在说说实现思路: 工人、包工头都是无状态的。这有个好处是不管来多少工作,都可以进行处理;缺点是没有办法进行后续干预。
综合来说,我还是觉得无状态的比有状态的更好。
因为一开始我的实现是有状态的,甚至可以让包工头取消一个工作,包工头再让工人取消工作,但是这样会带来异常复杂的分布式状态维护。但是改成用无状态的模式,就方便多了。
在职业介绍所,有两个重要的方法,一个是doWork,即立即执行一个工作,如果找不到合适的工头和工人,就会抛出异常;另外一个是autoMatch触发职业介绍所进行一次自动匹配。之所以没有做在职业介绍所内部开启线程是为了给外部提供更方便的控制。
所以,其总体设计思想就是开个职业介绍所,工人或工头、工作就可以注册进来,由职业介绍所来撮合成合适的虚拟小组来达成任务。是不是很容易理解?
下面就是所有的接口与实现类了:
155218_thit_1245989.jpg (16.92 KB, 下载次数: 0)
2015-5-27 22:14 上传
对于做并行开发的人员来说:
职业介绍所,工人,工头都不用开发,框架自带的已经足够用了。开发人员只要开发工人和工作分解合并器即可。
工人继承AbstractWorker之后,只有一个方法实现即可。工作分解一个方法,工作合并一个方法,其它的都交给Tiny并行计算框架吧。
总结:
此框架对并行计算的参与者进行了分解,分解为职业介绍所、工头、工人及工作,职业介绍所是中心,职业介绍所停止运行,将无法进行并行计算。工头、工人是可以在任意计算机的,工作是可以在任意节点添加的,由于框架提供了工作序列化功能,因此只要设置工作需要序列化是true,此工作将一直存在,直到被完成,利用此特性可以方便的实现简单的MQ。同时为方便业务实现,工人有抽象类,建议直接继承抽象类实现要关业务方法即可。
注意:目前在执行过程中工人注销或停止服务,会导致整个工作执行失败,下次继续进行执行,后续拟改成考虑用其它工人继续工作。
今天比较忙,写得比较匆忙,如果有不清楚的可以在下面提问。