Flink resource manager的作用如图,
FlinkResourceManager
/** * * <h1>Worker allocation steps</h1> * * <ol> * <li>The resource manager decides to request more workers. This can happen in order * to fill the initial pool, or as a result of the JobManager requesting more workers.</li> * * <li>The resource master calls {@link #requestNewWorkers(int)}, which triggers requests * for more containers. After that, the {@link #getNumWorkerRequestsPending()} * should reflect the pending requests.</li> * * <li>The concrete framework may acquire containers and then trigger to start TaskManagers * in those containers. That should be reflected in {@link #getNumWorkersPendingRegistration()}.</li> * * <li>At some point, the TaskManager processes will have started and send a registration * message to the JobManager. The JobManager will perform * a lookup with the ResourceManager to check if it really started this TaskManager. * The method {@link #workerStarted(ResourceID)} will be called * to inform about a registered worker.</li> * </ol> * */ public abstract class FlinkResourceManager<WorkerType extends ResourceIDRetrievable> extends FlinkUntypedActor { /** The service to find the right leader JobManager (to support high availability) */ private final LeaderRetrievalService leaderRetriever; //用于发现leader jobmanager和当leader切换时收到通知 /** Map which contains the workers from which we know that they have been successfully started * in a container. This notification is sent by the JM when a TM tries to register at it. */ private final Map<ResourceID, WorkerType> startedWorkers; //已经成功启动的Workers,当他启动成功注册到JM的时候,JM会发出通知 /** The JobManager that the framework master manages resources for */ private ActorRef jobManager; /** Our JobManager‘s leader session */ private UUID leaderSessionID; /** The size of the worker pool that the resource master strives to maintain */ private int designatedPoolSize; //resource pool大小
上面注释里面,把申请resource的过程写的蛮清楚的
ResourceManager作为actor, 主要是处理message,
@Override protected void handleMessage(Object message) { try { // --- messages about worker allocation and pool sizes if (message instanceof CheckAndAllocateContainers) { checkWorkersPool(); } else if (message instanceof SetWorkerPoolSize) { SetWorkerPoolSize msg = (SetWorkerPoolSize) message; adjustDesignatedNumberOfWorkers(msg.numberOfWorkers()); } else if (message instanceof RemoveResource) { RemoveResource msg = (RemoveResource) message; removeRegisteredResource(msg.resourceId()); } // --- lookup of registered resources else if (message instanceof NotifyResourceStarted) { NotifyResourceStarted msg = (NotifyResourceStarted) message; handleResourceStarted(sender(), msg.getResourceID()); } // --- messages about JobManager leader status and registration else if (message instanceof NewLeaderAvailable) { NewLeaderAvailable msg = (NewLeaderAvailable) message; newJobManagerLeaderAvailable(msg.leaderAddress(), msg.leaderSessionId()); } else if (message instanceof TriggerRegistrationAtJobManager) { TriggerRegistrationAtJobManager msg = (TriggerRegistrationAtJobManager) message; triggerConnectingToJobManager(msg.jobManagerAddress()); } else if (message instanceof RegisterResourceManagerSuccessful) { RegisterResourceManagerSuccessful msg = (RegisterResourceManagerSuccessful) message; jobManagerLeaderConnected(msg.jobManager(), msg.currentlyRegisteredTaskManagers()); }
其中关键的是,
checkWorkersPool
/** * This method causes the resource framework master to <b>synchronously</b>re-examine * the set of available and pending workers containers, and allocate containers * if needed. * * This method does not automatically release workers, because it is not visible to * this resource master which workers can be released. Instead, the JobManager must * explicitly release individual workers. */ private void checkWorkersPool() { int numWorkersPending = getNumWorkerRequestsPending(); int numWorkersPendingRegistration = getNumWorkersPendingRegistration(); // see how many workers we want, and whether we have enough int allAvailableAndPending = startedWorkers.size() + numWorkersPending + numWorkersPendingRegistration; int missing = designatedPoolSize - allAvailableAndPending; if (missing > 0) { requestNewWorkers(missing); //如果现有的worker不够,去requestNewWorker } }
job在收到taskManager的register信息后,会通知ResourceManager,调用到handleResourceStarted
/** * Tells the ResourceManager that a TaskManager had been started in a container with the given * resource id. * * @param jobManager The sender (JobManager) of the message * @param resourceID The resource id of the started TaskManager */ private void handleResourceStarted(ActorRef jobManager, ResourceID resourceID) { if (resourceID != null) { // check if resourceID is already registered (TaskManager may send duplicate register messages) WorkerType oldWorker = startedWorkers.get(resourceID); if (oldWorker != null) { //看看该worker是否已经存在 LOG.debug("Notification that TaskManager {} had been started was sent before.", resourceID); } else { WorkerType newWorker = workerStarted(resourceID); //取得worker if (newWorker != null) { startedWorkers.put(resourceID, newWorker); //注册新的worker LOG.info("TaskManager {} has started.", resourceID); } else { LOG.info("TaskManager {} has not been started by this resource manager.", resourceID); } } } // Acknowledge the resource registration jobManager.tell(decorateMessage(Acknowledge.get()), self()); //告诉jobManager,已经完成注册 }
Job资源分配的过程,
在submitJob中,会生成ExecutionGraph
最终调用到,
executionGraph.scheduleForExecution(scheduler)
接着,ExecutionGraph
public void scheduleForExecution(SlotProvider slotProvider) throws JobException {
// simply take the vertices without inputs.for (ExecutionJobVertex ejv : this.tasks.values()) { if (ejv.getJobVertex().isInputVertex()) { ejv.scheduleAll(slotProvider, allowQueuedScheduling); }}
然后,ExecutionJobVertex
public void scheduleAll(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException { ExecutionVertex[] vertices = this.taskVertices; // kick off the tasks for (ExecutionVertex ev : vertices) { ev.scheduleForExecution(slotProvider, queued); }}
再,ExecutionVertex
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException { return this.currentExecution.scheduleForExecution(slotProvider, queued);}
最终,Execution
public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) throws NoResourceAvailableException { final SlotSharingGroup sharingGroup = vertex.getJobVertex().getSlotSharingGroup(); final CoLocationConstraint locationConstraint = vertex.getLocationConstraint(); if (transitionState(CREATED, SCHEDULED)) { ScheduledUnit toSchedule = locationConstraint == null ? new ScheduledUnit(this, sharingGroup) : new ScheduledUnit(this, sharingGroup, locationConstraint); // IMPORTANT: To prevent leaks of cluster resources, we need to make sure that slots are returned // in all cases where the deployment failed. we use many try {} finally {} clauses to assure that final Future<SimpleSlot> slotAllocationFuture = slotProvider.allocateSlot(toSchedule, queued); //异步去申请资源 // IMPORTANT: We have to use the synchronous handle operation (direct executor) here so // that we directly deploy the tasks if the slot allocation future is completed. This is // necessary for immediate deployment. final Future<Void> deploymentFuture = slotAllocationFuture.handle(new BiFunction<SimpleSlot, Throwable, Void>() { @Override public Void apply(SimpleSlot simpleSlot, Throwable throwable) { if (simpleSlot != null) { try { deployToSlot(simpleSlot); //如果申请到,去部署 } catch (Throwable t) { try { simpleSlot.releaseSlot(); } finally { markFailed(t); } } } else { markFailed(throwable); } return null; } }); return true; }
调用到,slotProvider.allocateSlot, slotProvider即Scheduler
@Override public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) throws NoResourceAvailableException { final Object ret = scheduleTask(task, allowQueued); if (ret instanceof SimpleSlot) { return FlinkCompletableFuture.completed((SimpleSlot) ret); //如果是SimpleSlot,即已经分配成功,表示future结束 } else if (ret instanceof Future) { return (Future) ret; //Future说明没有足够资源,申请还在异步中,继续future } else { throw new RuntimeException(); } }
scheduleTask
/** * Returns either a {@link SimpleSlot}, or a {@link Future}. */ private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException { final ExecutionVertex vertex = task.getTaskToExecute().getVertex(); final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations(); final boolean forceExternalLocation = vertex.isScheduleLocalOnly() && preferredLocations != null && preferredLocations.iterator().hasNext(); synchronized (globalLock) { //全局锁 SlotSharingGroup sharingUnit = task.getSlotSharingGroup(); if (sharingUnit != null) { //如果是共享slot // 1) === If the task has a slot sharing group, schedule with shared slots === final SlotSharingGroupAssignment assignment = sharingUnit.getTaskAssignment(); final CoLocationConstraint constraint = task.getLocationConstraint(); // get a slot from the group, if the group has one for us (and can fulfill the constraint) final SimpleSlot slotFromGroup; if (constraint == null) { slotFromGroup = assignment.getSlotForTask(vertex); //试图从现有的slots中找合适的 } else { slotFromGroup = assignment.getSlotForTask(vertex, constraint); } SimpleSlot newSlot = null; SimpleSlot toUse = null; // the following needs to make sure any allocated slot is released in case of an error try { // check whether the slot from the group is already what we want. // any slot that is local, or where the assignment was unconstrained is good! if (slotFromGroup != null && slotFromGroup.getLocality() != Locality.NON_LOCAL) { //如果可以找到合适的 updateLocalityCounters(slotFromGroup, vertex); return slotFromGroup; //已经找到合适的slot,返回 } // the group did not have a local slot for us. see if we can one (or a better one) newSlot = getNewSlotForSharingGroup(vertex, locations, assignment, constraint, localOnly); //试图申请一个新的slot if (slotFromGroup == null || !slotFromGroup.isAlive() || newSlot.getLocality() == Locality.LOCAL) { // if there is no slot from the group, or the new slot is local, // then we use the new slot if (slotFromGroup != null) { slotFromGroup.releaseSlot(); } toUse = newSlot; } else { // both are available and usable. neither is local. in that case, we may // as well use the slot from the sharing group, to minimize the number of // instances that the job occupies newSlot.releaseSlot(); toUse = slotFromGroup; } // if this is the first slot for the co-location constraint, we lock // the location, because we are going to use that slot if (constraint != null && !constraint.isAssigned()) { constraint.lockLocation(); } updateLocalityCounters(toUse, vertex); } return toUse; //返回申请的slot } else { //如果不是共享slot,比较简单 // 2) === schedule without hints and sharing === SimpleSlot slot = getFreeSlotForTask(vertex, preferredLocations, forceExternalLocation); //直接申请slot if (slot != null) { updateLocalityCounters(slot, vertex); return slot; //申请到了就返回slot } else { // no resource available now, so queue the request if (queueIfNoResource) { //如果可以queue CompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); this.taskQueue.add(new QueuedTask(task, future)); //把task缓存起来,并把future对象返回,表示异步申请 return future; } } } } }
我们直接看非共享slot的case,
会调用到, getFreeSlotForTask
/** * Gets a suitable instance to schedule the vertex execution to. * <p> * NOTE: This method does is not thread-safe, it needs to be synchronized by the caller. * * @param vertex The task to run. * @return The instance to run the vertex on, it {@code null}, if no instance is available. */ protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex, Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) { // we need potentially to loop multiple times, because there may be false positives // in the set-with-available-instances while (true) { Pair<Instance, Locality> instanceLocalityPair = findInstance(requestedLocations, localOnly); //找到分配slot的并符合location约束的instance if (instanceLocalityPair == null){ return null; //没有合适的instance,分配失败 } Instance instanceToUse = instanceLocalityPair.getLeft(); Locality locality = instanceLocalityPair.getRight(); try { SimpleSlot slot = instanceToUse.allocateSimpleSlot(vertex.getJobId()); //从instance分配出slot // if the instance has further available slots, re-add it to the set of available resources. if (instanceToUse.hasResourcesAvailable()) { //如果这个实例还有resources,放入instancesWithAvailableResources,下次可以继续分配 this.instancesWithAvailableResources.put(instanceToUse.getTaskManagerID(), instanceToUse); } if (slot != null) { slot.setLocality(locality); return slot; //成功就返回slot } } catch (InstanceDiedException e) { // the instance died it has not yet been propagated to this scheduler // remove the instance from the set of available instances removeInstance(instanceToUse); } // if we failed to get a slot, fall through the loop } }
findInstance
/** * Tries to find a requested instance. If no such instance is available it will return a non- * local instance. If no such instance exists (all slots occupied), then return null. * * <p><b>NOTE:</b> This method is not thread-safe, it needs to be synchronized by the caller.</p> * * @param requestedLocations The list of preferred instances. May be null or empty, which indicates that * no locality preference exists. * @param localOnly Flag to indicate whether only one of the exact local instances can be chosen. */ private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) { // drain the queue of newly available instances while (this.newlyAvailableInstances.size() > 0) { //把newlyAvailableInstances新加到instancesWithAvailableResources Instance queuedInstance = this.newlyAvailableInstances.poll(); if (queuedInstance != null) { this.instancesWithAvailableResources.put(queuedInstance.getTaskManagerID(), queuedInstance); } } // if nothing is available at all, return null if (this.instancesWithAvailableResources.isEmpty()) { //如果没有instancesWithAvailableResources,直接返回失败 return null; } Iterator<TaskManagerLocation> locations = requestedLocations == null ? null : requestedLocations.iterator(); if (locations != null && locations.hasNext()) { //按照locality preference依次找instance // we have a locality preference while (locations.hasNext()) { TaskManagerLocation location = locations.next(); if (location != null) { Instance instance = instancesWithAvailableResources.remove(location.getResourceID()); if (instance != null) { return new ImmutablePair<Instance, Locality>(instance, Locality.LOCAL); } } } // no local instance available if (localOnly) { return null; } else { // take the first instance from the instances with resources Iterator<Instance> instances = instancesWithAvailableResources.values().iterator(); Instance instanceToUse = instances.next(); instances.remove(); return new ImmutablePair<>(instanceToUse, Locality.NON_LOCAL); } } else { // no location preference, so use some instance Iterator<Instance> instances = instancesWithAvailableResources.values().iterator(); Instance instanceToUse = instances.next(); instances.remove(); return new ImmutablePair<>(instanceToUse, Locality.UNCONSTRAINED); } }
那么继续,newlyAvailableInstances,哪儿来的?
@Override public void newInstanceAvailable(Instance instance) { // synchronize globally for instance changes synchronized (this.globalLock) { try { // make sure we get notifications about slots becoming available instance.setSlotAvailabilityListener(this); //将Scheduler设为Instance的SlotAvailabilityListener // store the instance in the by-host-lookup String instanceHostName = instance.getTaskManagerLocation().getHostname(); Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName); if (instanceSet == null) { instanceSet = new HashSet<Instance>(); allInstancesByHost.put(instanceHostName, instanceSet); } instanceSet.add(instance); // add it to the available resources and let potential waiters know this.instancesWithAvailableResources.put(instance.getTaskManagerID(), instance); //放入instancesWithAvailableResources // add all slots as available for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) { newSlotAvailable(instance); } } } }
@Override public void newSlotAvailable(final Instance instance) { // WARNING: The asynchrony here is necessary, because we cannot guarantee the order // of lock acquisition (global scheduler, instance) and otherwise lead to potential deadlocks: // // -> The scheduler needs to grab them (1) global scheduler lock // (2) slot/instance lock // -> The slot releasing grabs (1) slot/instance (for releasing) and // (2) scheduler (to check whether to take a new task item // // that leads with a high probability to deadlocks, when scheduling fast this.newlyAvailableInstances.add(instance); //加入到newlyAvailableInstances Futures.future(new Callable<Object>() { @Override public Object call() throws Exception { handleNewSlot(); //异步的处理queue中的task,当有新的slot要把queue中的task执行掉 return null; } }, executionContext); }
接着newInstanceAvailable,在InstanceManager里面被调用,
private void notifyNewInstance(Instance instance) { synchronized (this.instanceListeners) { for (InstanceListener listener : this.instanceListeners) { try { listener.newInstanceAvailable(instance); } catch (Throwable t) { LOG.error("Notification of new instance availability failed.", t); } } } }
notifyNewInstance在registerTaskManager中被调用,
registerTaskManager是在JobManager里面当taskManager注册时被调用的
case msg @ RegisterTaskManager( resourceId, connectionInfo, hardwareInformation, numberOfSlots) => val taskManager = sender() currentResourceManager match { case Some(rm) => //如果有resourceManager val future = (rm ? decorateMessage(new NotifyResourceStarted(msg.resourceId)))(timeout) //通知ResourceMananger,某个resource已经成功启动 } // ResourceManager is told about the resource, now let‘s try to register TaskManager if (instanceManager.isRegistered(resourceId)) { //如果已经注册过 val instanceID = instanceManager.getRegisteredInstance(resourceId).getId taskManager ! decorateMessage( AlreadyRegistered( instanceID, libraryCacheManager.getBlobServerPort)) } else { //新的resource try { val actorGateway = new AkkaActorGateway(taskManager, leaderSessionID.orNull) val taskManagerGateway = new ActorTaskManagerGateway(actorGateway) val instanceID = instanceManager.registerTaskManager( //向InstanceManager注册该TaskManager taskManagerGateway, connectionInfo, hardwareInformation, numberOfSlots) taskManagerMap.put(taskManager, instanceID) //在jobManager里面记录该taskManager taskManager ! decorateMessage( AcknowledgeRegistration(instanceID, libraryCacheManager.getBlobServerPort)) //通知taskManager完成注册 // to be notified when the taskManager is no longer reachable context.watch(taskManager) } }
这个版本没有实现图中的架构
当前TaskManager还是注册到JobManager,然后JobMananger会通知ResourceManager
当前ResourceManager只是起到一个记录的作用
ResourceManager没有从JobManager中独立出来
仍然是这种架构,