JStorm之Topology调度

topology在服务端提交过程中,会经过一系列的验证和初始化:TP结构校验、创建本地目录并拷贝序列化文件jar包、生成znode用于存放TP和task等信息,最后一步才进行任务分配,如下图:

提交主函数位于ServiceHandler.java中

private void makeAssignment(String topologyName, String topologyId,
		TopologyInitialStatus status) throws FailedAssignTopologyException {
	//1、创建topology的分配事件
	TopologyAssignEvent assignEvent = new TopologyAssignEvent();
	assignEvent.setTopologyId(topologyId);
	assignEvent.setScratch(false);
	assignEvent.setTopologyName(topologyName);
	assignEvent.setOldStatus(Thrift
			.topologyInitialStatusToStormStatus(status));
  //2、丢入事件处理队列
	TopologyAssign.push(assignEvent);
  //3、等待时间返回
	boolean isSuccess = assignEvent.waitFinish();
	if (isSuccess == true) {
		LOG.info("Finish submit for " + topologyName);
	} else {
		throw new FailedAssignTopologyException(
				assignEvent.getErrorMsg());
	}
}

这其中最主要的是事件丢入队列后后续的处理过程,事件分配由TopologyAssign线程处理,这个线程的流程很清晰,监听事件队列,一旦有事件进入,马上取出,进行doTopologyAssignment,如下:

public void run() {
	LOG.info("TopologyAssign thread has been started");
	runFlag = true;

	while (runFlag) {
		TopologyAssignEvent event;
		try {
			event = queue.take();
		} catch (InterruptedException e1) {
			continue;
		}
		if (event == null) {
			continue;
		}

		boolean isSuccess = doTopologyAssignment(event);

		..............
}

任务分配的核心代码位于TopologyAssign.java中

public Assignment mkAssignment(TopologyAssignEvent event) throws Exception {
	String topologyId = event.getTopologyId();

	LOG.info("Determining assignment for " + topologyId);

	TopologyAssignContext context = prepareTopologyAssign(event);

	Set<ResourceWorkerSlot> assignments = null;

	if (!StormConfig.local_mode(nimbusData.getConf())) {

		IToplogyScheduler scheduler = schedulers
				.get(DEFAULT_SCHEDULER_NAME);
		//开始进行作业的调度
		assignments = scheduler.assignTasks(context);

	} else {
		assignments = mkLocalAssignment(context);
	}
	............
}

调用栈如下:

分配原理是首先获得所有可用的supervisor,判断supervisor可用的标准是是否有空闲的slot,也就是是否所有supervisor.slots.ports指定端口都被占用,然后计算出需要分配几个woker,因为一个woker对应一个端口,当然这些信息的采集都是来自Zookeeper,现在我们来分析分配的核心代码:

WorkerMaker.java

//注意参数,result是这个作业需要的槽位,传入前只知道需要槽位的数量,具体分配到哪台supervisor上还没指定

//supervisors指当前集群中所有可用的supervisor,即有空闲端口的

private void putWorkerToSupervisor(List<ResourceWorkerSlot> result,
		List<SupervisorInfo> supervisors) {
	int key = 0;
	//按所需槽位遍历,每次分配一个
	for (ResourceWorkerSlot worker : result) {
		//首先进行必要的判断和置位
		if (supervisors.size() == 0)
			return;
		if (worker.getNodeId() != null)
			continue;
		if (key >= supervisors.size())
			key = 0;
		//1、取出第一个supervisor
		SupervisorInfo supervisor = supervisors.get(key);
		worker.setHostname(supervisor.getHostName());
		worker.setNodeId(supervisor.getSupervisorId());
		worker.setPort(supervisor.getWorkerPorts().iterator().next());
		//槽位用完则从集合中删除,不再参与分配
		supervisor.getWorkerPorts().remove(worker.getPort());
		if (supervisor.getWorkerPorts().size() == 0)
			supervisors.remove(supervisor);
		//当一个supervisor分配完后便不再使用,除非supervisor不够用
		key++;
	}
}

从上面的代码中我们可以看到,目前槽位分配没考虑机器负载,槽位的分配并不一定平均,比如第一个supervisor有10个槽位,剩下的supervisor只有两个,那么还是要每个supervisor分配一个woker的。注意一个问题,在上面代码中supervisors这个集合是经过排序的,排序规则如下:

private void putAllWorkerToSupervisor(List<ResourceWorkerSlot> result,
		List<SupervisorInfo> supervisors) {
	...........
	supervisors = this.getCanUseSupervisors(supervisors);
	Collections.sort(supervisors, new Comparator<SupervisorInfo>() {

		@Override
		public int compare(SupervisorInfo o1, SupervisorInfo o2) {
			// TODO Auto-generated method stub
			return -NumberUtils.compare(o1.getWorkerPorts().size(), o2
					.getWorkerPorts().size());
		}

	});
	this.putWorkerToSupervisor(result, supervisors);
	.............
}

可以看到,当前排序规则是按slot多少的,我们后续版本中可能会考虑机器负载的一些因素吧。

时间: 2024-11-05 13:36:15

JStorm之Topology调度的相关文章

流式计算-Jstorm提交Topology过程(下)

紧接上篇流式计算-Jstorm提交Topology过程(上), 5.上篇任务已经ServiceHandler.submitTopologyWithOpts()方法,在该方法中,会实例化一个TopologyAssignEvent,相当于创建了一个topology级别的作业,然后将其保存到TopologyAssign的任务队列中,具体代码如下: TopologyAssignEvent assignEvent = new TopologyAssignEvent(); assignEvent.setTo

流式计算-Jstorm提交Topology过程

Topology是Jstorm对有向无环图的抽象,内部封装了数据来源spout和数据处理单元bolt,以及spout和bolt.bolt和bolt之间的关系,它可以被提交到Jstorm集群. 本文以Jstorm自带的SequenceTopology简单介绍一下Jstorm提交topology的过程,本文主要介绍提交过程,不涉及具体业务, 1. SequenceTopology核心方法com.alipay.dw.jstorm.example.sequence.SequenceTopology.Se

JStorm之Topology提交服务端

topology提交前会先判断集群中是否存在同名作业,如果存在在提交失败,如果没有则会增加集群提交次数SubmittedCount,每次提交成功,该变量都会加1,然后会为该作业分配一个id,生成规则如下: public static String TopologyNameToId(String topologyName, int counter) { return topologyName + "-" + counter + "-" + TimeUtils.curr

JStorm之Topology提交客户端

一个topology包含一或多个spout bolt,spout负责在数据源获得数据并发送给bolt,每个bolt负责做完处理后发给下一个bolt.通常topology的创建是由TopologyBuilder来创建的,该组件会记录包含哪些spout bolt,并做相应验证:各组件是否有id冲突,校验方法如下: private void validateUnusedId(String id) { if (_bolts.containsKey(id)) { throw new IllegalArgu

Storm 和JStorm

关于流处理框架,在先前的文章汇总已经介绍过Strom,今天学习的是来自阿里的的流处理框架JStorm.简单的概述Storm就是:JStorm 比Storm更稳定,更强大,更快,Storm上跑的程序,一行代码不变可以运行在JStorm上.直白的将JStorm是阿里巴巴的团队基于Storm的二次开发产物,相当于他们的Tengine是基于Ngix开发的一样.以下为阿里巴巴团队放弃直接使用Storm选择自行开发JStorm的原因: 阿里拥有自己的实时计算引擎 类似于hadoop 中的MR 开源storm

jstorm之于storm

关于流处理框架,在先前的文章汇总已经介绍过Strom,今天学习的是来自阿里的的流处理框架JStorm.简单的概述Storm就是:JStorm 比Storm更稳定,更强大,更快,Storm上跑的程序,一行代码不变可以运行在JStorm上.直白的将JStorm是阿里巴巴的团队基于Storm的二次开发产物,相当于他们的Tengine是基于Ngix开发的一样. 阿里拥有自己的实时计算引擎 类似于hadoop 中的MR 开源storm响应太慢 开源社区的速度完全跟不上Ali的需求 降低未来运维成本 提供更

[转]JStorm介绍

一.简介 Storm是开源的分布式容错实时计算系统,目前被托管在GitHub上,遵循 Eclipse Public License 1.0.最初由BackType开发,现在已被Twitter收入麾下.Storm最新版本是Storm 0.9,核心采用Clojure实现.Storm为分布式实时计算提供了一组通用原语,可被用于“流处理”之中,实时处理消息:Storm也可被用于“连续计算”(continuous computation),对数据流做连续处理,在计算时就将结果以流的形式输出给用户:它还可被

Storm调度

p.MsoNormal { margin: 0pt; margin-bottom: .0001pt; text-align: justify; font-family: Calibri; font-size: 10.5000pt } h1 { margin-top: 5.0000pt; margin-bottom: 5.0000pt; text-align: left; font-family: 宋体; font-weight: bold; font-size: 24.0000pt } span

jstorm的cgroup资源隔离机制

本文研究一下jstorm使用cgroup做资源隔离的情况,github有文档: https://github.com/alibaba/jstorm/wiki/%E8%B5%84%E6%BA%90%E7%A1%AC%E9%9A%94%E7%A6%BB 这个文档告诉你怎么开启cgroup,但对于不太了解cgroup和jstorm细节的同学可能更有兴趣看一下到底是怎么隔离的. 废话少说,你不是告诉我cgroup做资源隔离吗?你回答我两个问题: 1.什么是cgroup 2.jstorm怎么用cgroup