紧接上篇流式计算-Jstorm提交Topology过程(上),
5、上篇任务已经ServiceHandler.submitTopologyWithOpts()方法,在该方法中,会实例化一个TopologyAssignEvent,相当于创建了一个topology级别的作业,然后将其保存到TopologyAssign的任务队列中,具体代码如下:
TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTopologyId(topologyId); assignEvent.setScratch(false); assignEvent.setTopologyName(topologyname); assignEvent.setOldStatus(Thrift .topologyInitialStatusToStormStatus(options .get_initial_status())); TopologyAssign.push(assignEvent);6、TopologyAssign是Jstorm一个任务分配器,它会根据配置和Topology中spout和bolt的关系来进行Task的创建和分配,但是具体任务的创建和非配并发其自身完成的,二是调用Jstorm自身的调度器完成的,当然Jstorm允许用户根据自己业务需求定制调度器,关于Jstorm的调度器分析会本人专门写一篇文章,此处暂不做任何说明。回到TopologyAssign,该类是一个实现了Runnable接口的后台线程,随着Nimbus启动,主要完成topology作业分配、备份和作业均衡的作用,当天还是通过Jstorm的调度器来完成的,其run方法会采用阻塞的方式获取自身作业队列中的作业,然后进行作业分配,其作业分配核心业务如下
public Assignment mkAssignment(TopologyAssignEvent event) throws Exception { String topologyId = event.getTopologyId(); TopologyAssignContext context = prepareTopologyAssign(event); //ResourceWorkerSlot是worker的抽象,封装了worker和其task Set<ResourceWorkerSlot> assignments = null; IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME); //通过Jstorm的调度来计算任务的分配 assignments = scheduler.assignTasks(context); Assignment assignment = null; Map<String, String> nodeHost = getTopologyNodeHost( context.getCluster(), context.getOldAssignment(), assignments); Map<Integer, Integer> startTimes = getTaskStartTimes(context, nimbusData, topologyId, context.getOldAssignment(), assignments); //获取提交到集群的jar包地址,Worker执行任务时需要下载代码 String codeDir = StormConfig.masterStormdistRoot(nimbusData.getConf(), topologyId); assignment = new Assignment(codeDir, assignments, nodeHost, startTimes); StormClusterState stormClusterState = nimbusData.getStormClusterState(); //将分配好的任务上传到ZK,通知supervisor stormClusterState.set_assignment(topologyId, assignment); //更新Task的开始时间 NimbusUtils.updateTaskHbStartTime(nimbusData, assignment, topologyId); // 更新元信息到ZK if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_MONITOR) NimbusUtils.updateMetricsInfo(nimbusData, topologyId, assignment); else metricsMonitor(event); return assignment; }7、Nimbus已经将任务分配好了,并且创建到ZK上,此时就需要supervisor认领自己的任务了,supervisor获取任务的具体逻辑封装在SyncSupervisorEvent,其也是一个后台线程,会不停获取ZK上(JSTORM_ROOT/assignments下)的全部任务,然后把自己的任务保存到本地磁盘上,再通过NimbusClient把topology的代码保存到本地,然后启动worker启动线程来执行任务,具体业务逻辑代码如下
public void run() { RunnableCallback syncCallback = new EventManagerZkPusher(this, syncSupEventManager); /** *首次启动时主动获取ZK上JSTORM_ROOT/assignments的全部任务,后续通过ZK的watch以一种回调的方式获取任务, */ Map<String, Assignment> assignments = Cluster.get_all_assignment( stormClusterState, syncCallback); /** *获取本地已经下载的topology */ List<String> downloadedTopologyIds = StormConfig .get_supervisor_toplogy_list(conf); /** * 在所有作业中,获取自身的作业 */ Map<Integer, LocalAssignment> localAssignment = getLocalAssign( stormClusterState, supervisorId, assignments); /** * 将作业保存到本地磁盘 */ localState.put(Common.LS_LOCAL_ASSIGNMENTS, localAssignment); // 获取topology的代码下载地址 Map<String, String> topologyCodes = getTopologyCodeLocations( assignments, supervisorId); //通过NimbusClient将代码下载到本地 downloadTopology(topologyCodes, downloadedTopologyIds); /** * 删除无用的topology */ removeUselessTopology(topologyCodes, downloadedTopologyIds); /** * 将syncProcesses加到执行队列,syncProcesses复杂启动新的worker来执行任务 */ processEventManager.add(syncProcesses); }8、SyncSupervisorEvent将自己的作业选出来,并保存到本地之后,再由SyncProcessEvent来启动worker执行具体的作业,SyncProcessEvent主要干两件事,启动新的worker,杀死无用的worker,此处要涉及启动新的Worker,具体业务逻辑如下
private void startNewWorkers(Set<Integer> keepPorts, Map<Integer, LocalAssignment> localAssignments) throws Exception { /** * 获取本次新分配的作业 */ Map<Integer, LocalAssignment> newWorkers = JStormUtils .select_keys_pred(keepPorts, localAssignments); /** * 给每个新作业生成一个ID */ Map<Integer, String> newWorkerIds = new HashMap<Integer, String>(); for (Entry<Integer, LocalAssignment> entry : newWorkers.entrySet()) { Integer port = entry.getKey(); LocalAssignment assignment = entry.getValue(); String workerId = UUID.randomUUID().toString(); newWorkerIds.put(port, workerId); //保存每个Worker的ID到本地 StormConfig.worker_pids_root(conf, workerId); //启动新的JVM执行作业 launchWorker(conf, sharedContext, assignment.getTopologyId(), supervisorId, port, workerId, assignment); }以上就是Jstorm提交一个topology的过程,这两篇文章只是给出了一条主线,具体的代码逻辑并未详细给出,后续会不断完善,同时关于Jstrom的调度器后续也会给出详细分析