Storm2

package storm.scheduler;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;

import backtype.storm.scheduler.Cluster;
import backtype.storm.scheduler.EvenScheduler;
import backtype.storm.scheduler.ExecutorDetails;
import backtype.storm.scheduler.IScheduler;
import backtype.storm.scheduler.SupervisorDetails;
import backtype.storm.scheduler.Topologies;
import backtype.storm.scheduler.TopologyDetails;
import backtype.storm.scheduler.WorkerSlot;

/**
 * 基于Storm Topology 热边的调度算法
 * @author wxweven
 */
public class OnlineScheduler implements IScheduler {
	private static final int DEFAULT_RESCHEDULE_TIMEOUT = 180;	// 单位为s
	private Logger logger = Logger.getLogger(OnlineScheduler.class);
	private AssignmentTracker assignmentTracker = new AssignmentTracker();
	private long lastRescheduling;

	/**
	 * 重写IScheduler中的方法,实现具体的调度算法
	 */
	@Override
	public void schedule(Topologies topologies, Cluster cluster) {
		logger.info("HotEdge Scheduler");
		if (!topologies.getTopologies().isEmpty()) {
			int rescheduleTimeout = DEFAULT_RESCHEDULE_TIMEOUT;
			for (TopologyDetails topology : topologies.getTopologies()) {
				rescheduleTimeout = Integer.parseInt(topology.getConf()
						.get(Utils.RESCHEDULE_TIMEOUT).toString());
			}
			long now = System.currentTimeMillis();
			long elapsedTime = (now - lastRescheduling) / 1000; // s
			if (lastRescheduling == 0 || elapsedTime >= rescheduleTimeout) {
				//执行具体的调度算法
				doSchedule(topologies, cluster);
			}
		}

		new EvenScheduler().schedule(topologies, cluster);
		assignmentTracker.checkAssignment(topologies, cluster);
	}

	/**
	 * 实现具体的改进调度算法
	 *
	 * @param topologies
	 *            提交的Topology
	 * @param cluster
	 *            集群
	 */
	private void doSchedule(Topologies topologies, Cluster cluster) {
		try {
			// 从数据库中获取提交的Topology
			List<String> dbTopologies = DataManager.getInstance().getTopologies();
			List<String> topologiesToBeRemoved = new ArrayList<String>(dbTopologies);
			List<String> stormTopologyList = new ArrayList<String>();
			int trafficImprovement = 0;
			for (TopologyDetails topology : topologies.getTopologies()) {
				topologiesToBeRemoved.remove(topology.getId());
				stormTopologyList.add(topology.getId());
				for (Object key : topology.getConf().keySet()) {
					logger.debug("- " + key + ": " + topology.getConf().get(key));
				}
				trafficImprovement = Integer.parseInt(topology.getConf()
						.get(Utils.TRAFFIC_IMPROVEMENT).toString());
			}

			dbTopologies.removeAll(topologiesToBeRemoved);

			if (!topologiesToBeRemoved.isEmpty()) {
				DataManager.getInstance().removeTopologies(topologiesToBeRemoved);
				logger.info("Topologies succesfully removed from DB");
			}

			// 计算调度策略
			TrafficManager.getInstance().clear();
			computeBestScheduling(dbTopologies, topologies, cluster);
			Map<Node, List<Slot>> bestAssignment = TrafficManager.getInstance().getAssignments();
			int bestInterNodeTraffic = TrafficManager.getInstance().computeInterNodeTraffic();
			int currentInterNodeTraffic = DataManager.getInstance().getCurrentInterNodeTraffic();
			List<Node> overloadedNodeList = DataManager.getInstance().getOverloadedNodes();

			logger.info("These nodes are currently overloaded: "
					+ Utils.collectionToString(overloadedNodeList));
			logger.info("Currently, the inter-node traffic is " + currentInterNodeTraffic
					+ " tuple/s");

			if (bestAssignment != null) {
				logger.info("The best assignment can lead to an inter-node traffic of "
						+ bestInterNodeTraffic + " tuple/s");
				boolean reschedulingDueToOverloading = false;
				boolean reschedulingDueToInterNodeTraffic = false;
				if (!overloadedNodeList.isEmpty()) {
					logger.info("Check how the new assignment can offload some of the currently overloaded nodes");
					for (Node node : overloadedNodeList) {
						Node nodeAfterTheAssignment = null;
						for (Node n : bestAssignment.keySet()) {
							if (n.equals(node)) {
								nodeAfterTheAssignment = n;
								break;
							}
						}
						if (nodeAfterTheAssignment != null) {
							logger.info("Node " + node.getName() + " currenlty has a load of "
									+ node.getLoad() + " Hz/s, and the assignment can lead to "
									+ nodeAfterTheAssignment.getLoad());
							if (node.getLoad() > nodeAfterTheAssignment.getLoad()) {
								reschedulingDueToOverloading = true;
							}
						} else {
							logger.warn("Node " + node.getName() + " currenlty has a load of "
									+ node.getLoad()
									+ " Hz/s, but it doesn‘t appear in the new assignment");
						}
					}
				}
				if (reschedulingDueToOverloading) {
					logger.info("A rescheduling is required to offload currently overloaded nodes");
				}
				int trafficThreshold = (int) (currentInterNodeTraffic * (1 - (float) trafficImprovement / 100));
				logger.info("Minimum traffic threshold is " + trafficThreshold + " tuple/s");
				if (trafficThreshold >= bestInterNodeTraffic) {
					logger.info("A rescheduling is required to lower inter-node traffic");
					reschedulingDueToInterNodeTraffic = true;
				}
				if (reschedulingDueToInterNodeTraffic || reschedulingDueToOverloading) {
					logger.info("Let‘s apply the best assignment!!");
					lastRescheduling = System.currentTimeMillis();

					// free all available slots
					for (SupervisorDetails supervisor : cluster.getSupervisors().values()) {
						List<Integer> usedPorts = cluster.getUsedPorts(supervisor);
						for (int usedPort : usedPorts) {
							cluster.freeSlot(new WorkerSlot(supervisor.getId(), usedPort));
						}
					}

					for (Node node : bestAssignment.keySet()) {
						SupervisorDetails supervisor = cluster.getSupervisorsByHost(node.getName())
								.get(0);
						List<WorkerSlot> availableSlots = cluster.getAvailableSlots(supervisor);
						int slotIndex = 0;
						for (Slot slot : bestAssignment.get(node)) {
							logger.info("Assigning executors of slot " + slot);
							String topology = slot.getTopology().getTopologyID();
							List<ExecutorDetails> executorList = new ArrayList<ExecutorDetails>();

							// here a match is required to link scheduler
							// executors to storm executors
							Collection<ExecutorDetails> allTopologyExecutors = topologies.getById(
									topology).getExecutors();
							for (Executor executor : slot.getExecutors()) {
								for (ExecutorDetails executorDetails : allTopologyExecutors) {
									if (executor.match(executorDetails)) {
										executorList.add(executorDetails);
										break;
									}
								}
							}

							cluster.assign(availableSlots.get(slotIndex), topology, executorList);
							logger.info("We assigned executors:"
									+ Utils.collectionToString(executorList) + " to slot: ["
									+ availableSlots.get(slotIndex).getNodeId() + ", "
									+ availableSlots.get(slotIndex).getPort() + "]");
							slotIndex++;

						}

					}

					DataManager.getInstance().removeTopologies(dbTopologies);

				}
			} else {
				logger.info("No assignment has been simulated");
			}

		} catch (Throwable t) {
			logger.error("Error occurred during scheduling", t);
		}
	}

	private void computeBestScheduling(List<String> dbTopologies, Topologies stormTopologies,
			Cluster cluster) throws Exception {

		logger.info("-- First phase --");
		List<Topology> topologyList = new ArrayList<Topology>();
		for (String topologyID : dbTopologies) {
			logger.info("Topology ID: " + topologyID);
			TopologyDetails topologyDetails = stormTopologies.getById(topologyID);
			Topology topology = new Topology(topologyDetails);
			topology.setTotalLoad(DataManager.getInstance().getTotalLoad(topologyID));
			topologyList.add(topology);

			List<ExecutorPair> interExecutorTrafficList = TrafficManager.getInstance()
					.getInterExecutorTrafficList(topologyID);
			logger.info("Inter-executor traffic stats: "
					+ Utils.collectionToString(interExecutorTrafficList));
			if (interExecutorTrafficList.isEmpty()) {
				logger.info("Traffic stats are not complete yet, skip this topology");
			} else {
				for (ExecutorPair executorPair : interExecutorTrafficList) {
					logger.debug("Executor pair: " + executorPair);
					List<Slot> slotList = topology.getContainingSlotList(executorPair.getSource(),
							executorPair.getDestination());
					logger.debug("Slots that already contain either executors: "
							+ Utils.collectionToString(slotList));
					if (slotList.isEmpty()) {
						logger.debug("Both executors have not been assigned yet, try to add them to the least loaded slot");
						Slot leastLoadedSlot = topology.getLeastLoadedSlot(
								executorPair.getSource(), executorPair.getDestination());
						if (leastLoadedSlot != null) {
							logger.debug("Least loaded slot able to get both the executors: "
									+ leastLoadedSlot);
							leastLoadedSlot.assign(executorPair.getSource());
							leastLoadedSlot.assign(executorPair.getDestination());
							logger.info("Executors " + executorPair.getSource() + " and "
									+ executorPair.getDestination() + " assigned to slot "
									+ leastLoadedSlot);
						} else {
							logger.debug("No slot exists that can get both the executors, assign them to distinct slots");
							leastLoadedSlot = topology.getLeastLoadedSlot(executorPair.getSource());
							if (leastLoadedSlot == null) {
								throw new RuntimeException(
										"Cannot find a slot able to get executor "
												+ executorPair.getSource() + " for topology "
												+ topology);
							}
							logger.debug("Least loaded slot for source executor: "
									+ leastLoadedSlot);
							leastLoadedSlot.assign(executorPair.getSource());
							logger.info("Executor " + executorPair.getSource()
									+ " assigned to slot " + leastLoadedSlot);
							if (leastLoadedSlot.canAccept(executorPair.getDestination())) {
								logger.debug("After having added executor "
										+ executorPair.getSource() + ", the slot "
										+ leastLoadedSlot + " can also get the executor "
										+ executorPair.getDestination());
								leastLoadedSlot.assign(executorPair.getDestination());
							} else {
								logger.debug("After having added executor "
										+ executorPair.getSource() + ", the slot "
										+ leastLoadedSlot + " cannot get the executor "
										+ executorPair.getDestination());
								leastLoadedSlot = topology.getLeastLoadedSlot(executorPair
										.getDestination());
								if (leastLoadedSlot == null) {
									throw new RuntimeException(
											"Cannot find a slot able to get executor "
													+ executorPair.getDestination()
													+ " for topology " + topology);
								}
								logger.debug("Least loaded slot for destination executor: "
										+ leastLoadedSlot);
								leastLoadedSlot.assign(executorPair.getDestination());
							}
							logger.info("Executor " + executorPair.getDestination()
									+ " assigned to slot " + leastLoadedSlot);
						}
					} else {
						logger.debug("Some executor has been already assigned, compute the best assignment using the slot(s) found before and the least loaded one");
						Slot leastLoadedSlot = topology.getLeastLoadedSlot(executorPair);
						logger.debug("Least loaded slot: " + leastLoadedSlot);
						if (leastLoadedSlot != null && !slotList.contains(leastLoadedSlot)) {
							slotList.add(leastLoadedSlot);
						}
						logger.debug("Slots to use: " + Utils.collectionToString(slotList));

						logger.debug("Remove source and destination from the slots they are currently assigned to");
						for (Slot slot : slotList) {
							if (slot.contains(executorPair.getSource())) {
								slot.remove(executorPair.getSource());
							}
							if (slot.contains(executorPair.getDestination())) {
								slot.remove(executorPair.getDestination());
							}
						}
						logger.debug("Slots to use after such removals: "
								+ Utils.collectionToString(slotList));

						logger.debug("Check every possible combination");
						Slot bestSlotForSource = null;
						Slot bestSlotForDestination = null;
						int minInterSlotTraffic = -1;
						for (Slot slotForSource : slotList) {
							for (Slot slotForDestination : slotList) {
								logger.debug("Assigning executor " + executorPair.getSource()
										+ " to slot " + slotForSource + " and executor "
										+ executorPair.getDestination() + " to slot "
										+ slotForDestination + "...");
								boolean assignmentOk = true;
								if (slotForSource.canAccept(executorPair.getSource())) {
									slotForSource.assign(executorPair.getSource());
								} else {
									logger.debug("Slot "
											+ slotForSource
											+ " is imbalanced, cannot be used to add more executors");
									assignmentOk = false;
								}

								if (slotForDestination.canAccept(executorPair.getDestination())) {
									slotForDestination.assign(executorPair.getDestination());
								} else {
									logger.debug("Slot "
											+ slotForDestination
											+ " is imbalanced, cannot be used to add more executors");
									assignmentOk = false;
								}

								if (assignmentOk) {
									int interSlotTraffic = TrafficManager.getInstance()
											.computeInterSlotTraffic(topologyID);
									logger.debug("...the inter-slot traffic is " + interSlotTraffic
											+ " tuple/s");
									if (minInterSlotTraffic == -1
											|| interSlotTraffic < minInterSlotTraffic) {
										bestSlotForSource = slotForSource;
										bestSlotForDestination = slotForDestination;
										minInterSlotTraffic = interSlotTraffic;
									}
								}
								if (slotForSource.contains(executorPair.getSource())) {
									slotForSource.remove(executorPair.getSource());
								}
								if (slotForDestination.contains(executorPair.getDestination())) {
									slotForDestination.remove(executorPair.getDestination());
								}
							}
						}
						if (bestSlotForSource == null || bestSlotForDestination == null) {
							throw new Exception("Cannot find a possible assignment of executors "
									+ executorPair.getSource() + " and "
									+ executorPair.getDestination() + " to slots "
									+ Utils.collectionToString(slotList));
						}
						logger.debug("The best assignment is executor " + executorPair.getSource()
								+ " to slot " + bestSlotForSource + " and executor "
								+ executorPair.getDestination() + " to slot "
								+ bestSlotForDestination + ", with inter-slot traffic "
								+ minInterSlotTraffic + " tuple/s");
						bestSlotForSource.assign(executorPair.getSource());
						bestSlotForDestination.assign(executorPair.getDestination());
						logger.info("Executor " + executorPair.getSource() + " assigned to slot "
								+ bestSlotForSource);
						logger.info("Executor " + executorPair.getDestination()
								+ " assigned to slot " + bestSlotForDestination);
					} /* end if (!slotList.isEmpty()) */

					logger.debug("Assignment of executors " + executorPair + " completed");

				} /* end for (ExecutorPair executorPair : executorPairList) */

				logger.info("Current assignment: " + Utils.collectionToString(topology.getSlots()));
				logger.info("Check for empty slots");
				List<Slot> emptySlotList = topology.getEmptySlots();
				if (emptySlotList.isEmpty()) {
					logger.info("No empty slots, the assignment is succesfully completed");
				} else {
					logger.info("Empty slots: " + Utils.collectionToString(emptySlotList));
					List<Slot> usedSlotList = topology.getUsedSlots();
					for (Slot emptySlot : emptySlotList) {
						logger.debug("Find an executor to assign to slot " + emptySlot);
						Executor bestExecutor = null;
						Slot bestSlot = null;
						int bestInterSlotTraffic = -1;
						for (Slot usedSlot : usedSlotList) {
							if (usedSlot.getExecutors().size() > 1) {
								logger.debug("Check the executors of slot " + usedSlot);
								List<Executor> executorList = new ArrayList<Executor>(
										usedSlot.getExecutors());
								for (Executor executor : executorList) {
									usedSlot.remove(executor);
									emptySlot.assign(executor);
									int interSlotTraffic = TrafficManager.getInstance()
											.computeInterSlotTraffic(topologyID);
									logger.debug("Moving executor " + executor
											+ ", the inter-slot traffic is " + interSlotTraffic
											+ " tuple/s");
									if (bestInterSlotTraffic == -1
											|| interSlotTraffic < bestInterSlotTraffic) {
										bestExecutor = executor;
										bestSlot = usedSlot;
										bestInterSlotTraffic = interSlotTraffic;
									}
									emptySlot.remove(executor);
									usedSlot.assign(executor);
								}
							}
						}
						if (bestSlot != null) {
							logger.debug("The best assignment is moving executor " + bestExecutor
									+ " from slot " + bestSlot + " to slot " + emptySlot
									+ " with an inter-slot traffic of " + bestInterSlotTraffic
									+ " tuple/s");
							bestSlot.remove(bestExecutor);
							emptySlot.assign(bestExecutor);
							logger.info("Executor " + bestExecutor + " moved from slot " + bestSlot
									+ " to slot " + emptySlot);
						} else {
							logger.warn("Cannot find an executor to move to slot " + emptySlot);
						}
					} /* end for (Slot emptySlot : emptySlotList) */
				}
				logger.info("Next assignment: " + Utils.collectionToString(topology.getSlots()));

				TrafficManager.getInstance().compileInterSlotTraffic();

			} /* end if (!executorPairList.isEmpty()) */

			logger.info("Assignments of executors for topology " + topologyID + " completed");

		} /* end for (String topologyID : dbTopologies) */
		logger.info("First phase completed!");
		List<SlotPair> interSlotTrafficList = TrafficManager.getInstance()
				.getInterSlotTrafficList();
		logger.info("Inter-slot traffic stats: " + Utils.collectionToString(interSlotTrafficList));

		// second phase
		logger.info("-- Second phase --");
		NodeManager nodeManager = new NodeManager(topologyList, cluster);
		if (nodeManager.getNodeCount() == 0) {
			logger.info("No nodes have been configured yet, cannot determine any scheduling");
		} else {
			for (SlotPair slotPair : interSlotTrafficList) {
				logger.info("Slot pair: " + slotPair);
				List<Node> nodeList = TrafficManager.getInstance().getContainingNodeList(
						slotPair.getFirst(), slotPair.getSecond());
				if (nodeList.isEmpty()) {
					logger.debug("Both slots have not been assigned yet, try to add them to the least loaded node");
					Node leastLoadedNode = nodeManager.getLeastLoadedNode(slotPair.getFirst(),
							slotPair.getSecond());
					if (leastLoadedNode != null) {
						logger.debug("Least loaded node able to get both the slots: "
								+ leastLoadedNode);
						leastLoadedNode.assign(slotPair.getFirst());
						leastLoadedNode.assign(slotPair.getSecond());
						logger.info("Slots "
								+ slotPair.getFirst()
								+ " and "
								+ slotPair.getSecond()
								+ " assigned to node "
								+ leastLoadedNode
								+ " (Slots of topology "
								+ slotPair.getFirst().getTopology().getTopologyID()
								+ " in this node: "
								+ leastLoadedNode.getTopologySlotCount(slotPair.getFirst()
										.getTopology().getTopologyID()));
					} else {
						logger.debug("No node exists that can get both the slots, assign them to distinct nodes");

						leastLoadedNode = nodeManager.getLeastLoadedNode(slotPair.getFirst());
						if (leastLoadedNode == null) {
							throw new RuntimeException(
									"Cannot find a node able to sustain the load of the slot "
											+ slotPair.getFirst());
						}
						logger.debug("Assign slot " + slotPair.getFirst() + " to node "
								+ leastLoadedNode);
						leastLoadedNode.assign(slotPair.getFirst());
						logger.debug("Least loaded node after such assignment: " + leastLoadedNode);
						logger.info("Slot "
								+ slotPair.getFirst()
								+ " assigned to node "
								+ leastLoadedNode
								+ " (Slots of topology "
								+ slotPair.getFirst().getTopology().getTopologyID()
								+ " in this node: "
								+ leastLoadedNode.getTopologySlotCount(slotPair.getFirst()
										.getTopology().getTopologyID()));

						leastLoadedNode = nodeManager.getLeastLoadedNode(slotPair.getSecond());
						if (leastLoadedNode == null) {
							throw new RuntimeException(
									"Cannot find a node able to sustain the load of the slot "
											+ slotPair.getSecond());
						}
						logger.debug("Assign slot " + slotPair.getSecond() + " to node "
								+ leastLoadedNode);
						leastLoadedNode.assign(slotPair.getSecond());
						logger.debug("Least loaded node after such assignment: " + leastLoadedNode);
						logger.info("Slot "
								+ slotPair.getSecond()
								+ " assigned to node "
								+ leastLoadedNode
								+ " (Slots of topology "
								+ slotPair.getSecond().getTopology().getTopologyID()
								+ " in this node: "
								+ leastLoadedNode.getTopologySlotCount(slotPair.getSecond()
										.getTopology().getTopologyID()));
					}
				} else {
					logger.debug("Either slot has been already assigned to nodes "
							+ Utils.collectionToString(nodeList));
					Node leastLoadedNode = nodeManager.getLeastLoadedNode(slotPair);
					logger.debug("The least loaded node able to sustain the traffic of the slot with the lowest traffic is "
							+ leastLoadedNode);
					if (leastLoadedNode != null && !nodeList.contains(leastLoadedNode)) {
						nodeList.add(leastLoadedNode);
					}
					logger.debug("Nodes to use: " + Utils.collectionToString(nodeList));
					logger.debug("Remove slots " + slotPair.getFirst() + " and "
							+ slotPair.getSecond() + " from these nodes");
					for (Node node : nodeList) {
						if (node.contains(slotPair.getFirst())) {
							node.remove(slotPair.getFirst());
						}
						if (node.contains(slotPair.getSecond())) {
							node.remove(slotPair.getSecond());
						}
					}
					logger.debug("Nodes after such removals: " + Utils.collectionToString(nodeList));

					logger.debug("Check every possible combination");
					Node bestNodeForFirst = null;
					Node bestNodeForSecond = null;
					int minInterNodeTraffic = -1;
					for (Node nodeForFirst : nodeList) {
						for (Node nodeForSecond : nodeList) {
							logger.debug("Assigning slot " + slotPair.getFirst() + " to node "
									+ nodeForFirst + " and slot " + slotPair.getSecond()
									+ " to node " + nodeForSecond + "...");
							boolean assignmentOK = true;

							if (nodeForFirst.canAssign(slotPair.getFirst())) {
								nodeForFirst.assign(slotPair.getFirst());
							} else {
								logger.debug("Cannot assign slot " + slotPair.getFirst()
										+ " to noe " + nodeForFirst);
								assignmentOK = false;
							}

							if (nodeForSecond.canAssign(slotPair.getSecond())) {
								nodeForSecond.assign(slotPair.getSecond());
							} else {
								logger.debug("Cannot assign slot " + slotPair.getSecond()
										+ " to node " + nodeForSecond);
								assignmentOK = false;
							}

							if (assignmentOK) {
								int tmpInterNodeTraffic = TrafficManager.getInstance()
										.computeInterNodeTraffic();
								logger.debug("...the inter-node traffic is " + tmpInterNodeTraffic
										+ " tuple/s");
								if (minInterNodeTraffic == -1
										|| tmpInterNodeTraffic < minInterNodeTraffic) {
									bestNodeForFirst = nodeForFirst;
									bestNodeForSecond = nodeForSecond;
									minInterNodeTraffic = tmpInterNodeTraffic;
								}
							}

							if (nodeForFirst.contains(slotPair.getFirst())) {
								nodeForFirst.remove(slotPair.getFirst());
							}
							if (nodeForSecond.contains(slotPair.getSecond())) {
								nodeForSecond.remove(slotPair.getSecond());
							}
						}
					}

					logger.debug("The best assignment is slot " + slotPair.getFirst() + " to node "
							+ bestNodeForFirst + " and slot " + slotPair.getSecond() + " to node "
							+ bestNodeForSecond + ", with inter-node traffic "
							+ minInterNodeTraffic + " tuple/s");
					bestNodeForFirst.assign(slotPair.getFirst());
					bestNodeForSecond.assign(slotPair.getSecond());
					logger.info("Slot "
							+ slotPair.getFirst()
							+ " assigned to node "
							+ bestNodeForFirst
							+ " (Slots of topology "
							+ slotPair.getFirst().getTopology().getTopologyID()
							+ " in this node: "
							+ bestNodeForFirst.getTopologySlotCount(slotPair.getFirst()
									.getTopology().getTopologyID()));
					logger.info("Slot "
							+ slotPair.getSecond()
							+ " assigned to node "
							+ bestNodeForSecond
							+ " (Slots of topology "
							+ slotPair.getSecond().getTopology().getTopologyID()
							+ " in this node: "
							+ bestNodeForSecond.getTopologySlotCount(slotPair.getSecond()
									.getTopology().getTopologyID()));

				} /* end if (!nodeList.isEmpty()) */

				logger.info("Assignment of slots " + slotPair + " completed");

			} /* end for (SlotPair slotPair : interSlotTrafficList) */

			if (TrafficManager.getInstance().getAssignments() != null) {
				logger.info("Intermediate assignment: "
						+ Utils.collectionToString(TrafficManager.getInstance().getAssignments()
								.keySet()));
			}

			/*
			 * ensure that the slots of a given topology are assigned to the
			 * proper number of nodes, otherwise the chances of
			 * parallelization/pipelining are not rightly exploited
			 */
			logger.info("Check whether all the topologies are using the desired number of nodes");
			for (Topology topology : topologyList) {
				// int numberOfNodesToUse = Math.min(topology.getSlots().size(),
				// nodeManager.getNodeCount());
				int numberOfNodesToUse = topology.getNumberOfNodesToUse(nodeManager.getNodeCount());
				List<Node> usedNodeList = null;
				while (TrafficManager.getInstance().getNodeList(topology) != null
						&& (usedNodeList = new ArrayList<Node>(TrafficManager.getInstance()
								.getNodeList(topology))).size() < numberOfNodesToUse) {
					logger.info("Topology " + topology + " is using " + usedNodeList.size()
							+ " nodes, while it should use " + numberOfNodesToUse);
					Node bestUsedNode = null;
					Node bestUnusedNode = null;
					Slot bestSlot = null;
					int bestTraffic = -1;
					for (Node usedNode : usedNodeList) {
						// check if this node has more than one slot for that
						// topology
						int topologySlotCount = 0;
						List<Slot> nodeSlotList = new ArrayList<Slot>(usedNode.getSlotList());
						for (Slot slot : nodeSlotList) {
							if (slot.getTopology() == topology) {
								topologySlotCount++;
							}
						}
						if (topologySlotCount > 1) {
							for (Slot slot : nodeSlotList) {
								Node unusedNode = nodeManager.getUnusedNode(usedNodeList, slot);
								if (unusedNode != null) {
									usedNode.remove(slot);
									unusedNode.assign(slot);
									int traffic = TrafficManager.getInstance()
											.computeInterNodeTraffic();
									logger.info("Moving slot " + slot + " from node " + usedNode
											+ " to node " + unusedNode + ", the traffic becomes "
											+ traffic + " tuple/s");
									if (bestUsedNode == null || traffic < bestTraffic) {
										bestUsedNode = usedNode;
										bestUnusedNode = unusedNode;
										bestSlot = slot;
										bestTraffic = traffic;
									}
									unusedNode.remove(slot);
									usedNode.assign(slot);
								} /* end if (unusedNode != null) */
							} /* end for (Slot slot : nodeSlotList) */
						} /* end if (topologySlotCount > 1) */
					} /* end for (Node usedNode : usedNodeList) */

					if (bestUnusedNode != null) {
						logger.info("The best is moving slot " + bestSlot + " from node "
								+ bestUsedNode + " to node " + bestUnusedNode
								+ ", with a traffic of " + bestTraffic + " tuple/s");
						bestUsedNode.remove(bestSlot);
						bestUnusedNode.assign(bestSlot);
					} else {
						logger.info("Cannot find a way to make topology " + topology
								+ " use the desired number of nodes");
						break;
					}

				} /* end while (number of used nodes < number of nodes to use */
			} /* end for (Topology topology : topologyList) */
		}

		logger.info("Second phase completed!");
		if (TrafficManager.getInstance().getAssignments() != null) {
			logger.info("Final assignment: "
					+ Utils.collectionToString(TrafficManager.getInstance().getAssignments()
							.keySet()));
		}
	}

}
时间: 2024-10-13 16:04:08

Storm2的相关文章

安装配置storm 第二步 配置Storm集群(二)

1.编辑host文件  vi  /etc/hosts   三台主机同时修改 192.168.2.20 storm01 192.168.2.21 storm02 192.168.2.22 storm03 2.主节点上(storm)配置storm (1)解压 apache-storm-1.0.3 (2)设置环境变量 : vi ~/.bash_profile STORM_HOME=/usr/local/apps/apache-storm-1.0.3 export STORM_HOME PATH=$ST

IT十八掌 徐培成 大数据 课堂笔记 storm第二天

storm ------------stream计算.实时性. master/slavenimbus //mastersupervisor //work node //zk spout //龙头,源源不断产生数据 nextTuple() bolt //接头. tuple //List<> storm.yaml-------------- zkserver: local.dir= slot.ports //槽位 - 6700 //worker process - 6701executor //执

Strom的配置安装

1.准备环境 1.1配置列表 配置项 版本信息 OS Red Hat   Enterprise Linux Server release 5.5 (Tikanga) IP 192.168.1.191/~.192/~.193 JDK java   version "1.7.0_25" zookeeper zookeeper-3.3.6 python Python 2.7.3   (default, Jun 29 2015, 16:45:20) storm apache-storm-0.9

流式大数据计算实践(2)----Hadoop集群和Zookeeper

一.前言 1.上一文搭建好了Hadoop单机模式,这一文继续搭建Hadoop集群 二.搭建Hadoop集群 1.根据上文的流程得到两台单机模式的机器,并保证两台单机模式正常启动,记得第二台机器core-site.xml内的fs.defaultFS参数值要改成本机的来启动,启动完毕后再改回来 2.清空数据,首先把运行单机模式后生成的数据全部清理掉 rm -rf /work/hadoop/nn/current rm -rf /work/hadoop/dn/current hdfs namenode