- void Master::_accept(
- ????const FrameworkID& frameworkId,
- ????const SlaveID& slaveId,
- ????const Resources& offeredResources,
- ????const scheduler::Call::Accept& accept,
- ????const Future<list<Future<bool>>>& _authorizations)
- {
- ??Framework* framework = getFramework(frameworkId);
- ……
-
?
- ??Slave* slave = slaves.registered.get(slaveId);
-
?
- ??if (slave == NULL || !slave->connected) {
- ????foreach (const Offer::Operation& operation, accept.operations()) {
- ??????if (operation.type() != Offer::Operation::LAUNCH) {
- ????????continue;
- ??????}
-
?
- ??????foreach (const TaskInfo& task, operation.launch().task_infos()) {
- ????????const TaskStatus::Reason reason =
- ????????????slave == NULL ? TaskStatus::REASON_SLAVE_REMOVED
- ??????????????????????????: TaskStatus::REASON_SLAVE_DISCONNECTED;
- ????????const StatusUpdate& update = protobuf::createStatusUpdate(
- ????????????framework->id(),
- ????????????task.slave_id(),
- ????????????task.task_id(),
- ????????????TASK_LOST,
- ????????????TaskStatus::SOURCE_MASTER,
- ????????????None(),
- ????????????slave == NULL ? "Slave removed" : "Slave disconnected",
- ????????????reason);
-
?
- ????????metrics->tasks_lost++;
-
?
- ????????metrics->incrementTasksStates(
- ????????????TASK_LOST,
- ????????????TaskStatus::SOURCE_MASTER,
- ????????????reason);
-
?
- ????????forward(update, UPID(), framework);
- ??????}
- ????}
-
?
- ????// Tell the allocator about the recovered resources.
- ????allocator->recoverResources(
- ????????frameworkId,
- ????????slaveId,
- ????????offeredResources,
- ????????None());
-
?
- ????return;
- ??}
-
?
- ??// Some offer operations update the offered resources. We keep
- ??// updated offered resources here. When a task is successfully
- ??// launched, we remove its resource from offered resources.
- ??Resources _offeredResources = offeredResources;
-
?
- ??// The order of `authorizations` must match the order of the operations in
- ??// `accept.operations()`, as they are iterated through simultaneously.
- ??CHECK_READY(_authorizations);
- ??list<Future<bool>> authorizations = _authorizations.get();
-
?
- ??foreach (const Offer::Operation& operation, accept.operations()) {
- ????switch (operation.type()) {
- ??????// The RESERVE operation allows a principal to reserve resources.
- ??????case Offer::Operation::RESERVE: {
- ????????Future<bool> authorization = authorizations.front();
- ????????authorizations.pop_front();
-
?
- ????????CHECK(!authorization.isDiscarded());
-
?
- ????????if (authorization.isFailed()) {
- ??????????// TODO(greggomann): We may want to retry this failed authorization
- ??????????// request rather than dropping it immediately.
- ??????????drop(framework,
- ???????????????operation,
- ???????????????"Authorization of principal ‘" + framework->info.principal() +
- ???????????????"‘ to reserve resources failed: " +
- ???????????????authorization.failure());
-
?
- ??????????continue;
- ????????} else
if (!authorization.get()) {
- ??????????drop(framework,
- ???????????????operation,
- ???????????????"Not authorized to reserve resources as ‘" +
- ?????????????????framework->info.principal() + "‘");
-
?
- ??????????continue;
- ????????}
-
?
- ????????Option<string> principal = framework->info.has_principal()
- ??????????? framework->info.principal()
- ??????????: Option<string>::none();
-
?
- ????????// Make sure this reserve operation is valid.
- ????????Option<Error> error = validation::operation::validate(
- ????????????operation.reserve(), principal);
-
?
- ????????if (error.isSome()) {
- ??????????drop(framework, operation, error.get().message);
- ??????????continue;
- ????????}
-
?
- ????????// Test the given operation on the included resources.
- ????????Try<Resources> resources = _offeredResources.apply(operation);
- ????????if (resources.isError()) {
- ??????????drop(framework, operation, resources.error());
- ??????????continue;
- ????????}
-
?
- ????????_offeredResources = resources.get();
-
?
- ????????LOG(INFO) << "Applying RESERVE operation for resources "
- ??????????????????<< operation.reserve().resources() << " from framework "
- ??????????????????<< *framework << " to slave " << *slave;
-
?
- ????????apply(framework, slave, operation);
- ????????break;
- ??????}
-
?
- ??????// The UNRESERVE operation allows a principal to unreserve resources.
- ??????case Offer::Operation::UNRESERVE: {
- ????????Future<bool> authorization = authorizations.front();
- ????????authorizations.pop_front();
-
?
- ????????CHECK(!authorization.isDiscarded());
-
?
- ????????if (authorization.isFailed()) {
- ??????????// TODO(greggomann): We may want to retry this failed authorization
- ??????????// request rather than dropping it immediately.
- ??????????drop(framework,
- ???????????????operation,
- ???????????????"Authorization of principal ‘" + framework->info.principal() +
- ???????????????"‘ to unreserve resources failed: " +
- ???????????????authorization.failure());
-
?
- ??????????continue;
- ????????} else
if (!authorization.get()) {
- ??????????drop(framework,
- ???????????????operation,
- ???????????????"Not authorized to unreserve resources as ‘" +
- ?????????????????framework->info.principal() + "‘");
-
?
- ??????????continue;
- ????????}
-
?
- ????????// Make sure this unreserve operation is valid.
- ????????Option<Error> error = validation::operation::validate(
- ????????????operation.unreserve());
-
?
- ????????if (error.isSome()) {
- ??????????drop(framework, operation, error.get().message);
- ??????????continue;
- ????????}
-
?
- ????????// Test the given operation on the included resources.
- ????????Try<Resources> resources = _offeredResources.apply(operation);
- ????????if (resources.isError()) {
- ??????????drop(framework, operation, resources.error());
- ??????????continue;
- ????????}
-
?
- ????????_offeredResources = resources.get();
-
?
- ????????LOG(INFO) << "Applying UNRESERVE operation for resources "
- ??????????????????<< operation.unreserve().resources() << " from framework "
- ??????????????????<< *framework << " to slave " << *slave;
-
?
- ????????apply(framework, slave, operation);
- ????????break;
- ??????}
-
?
- ??????case Offer::Operation::CREATE: {
- ????????Future<bool> authorization = authorizations.front();
- ????????authorizations.pop_front();
-
?
- ????????CHECK(!authorization.isDiscarded());
-
?
- ????????if (authorization.isFailed()) {
- ??????????// TODO(greggomann): We may want to retry this failed authorization
- ??????????// request rather than dropping it immediately.
- ??????????drop(framework,
- ???????????????operation,
- ???????????????"Authorization of principal ‘" + framework->info.principal() +
- ???????????????"‘ to create persistent volumes failed: " +
- ???????????????authorization.failure());
-
?
- ??????????continue;
- ????????} else
if (!authorization.get()) {
- ??????????drop(framework,
- ???????????????operation,
- ???????????????"Not authorized to create persistent volumes as ‘" +
- ?????????????????framework->info.principal() + "‘");
-
?
- ??????????continue;
- ????????}
-
?
- ????????// Make sure this create operation is valid.
- ????????Option<Error> error = validation::operation::validate(
- ????????????operation.create(), slave->checkpointedResources);
-
?
- ????????if (error.isSome()) {
- ??????????drop(framework, operation, error.get().message);
- ??????????continue;
- ????????}
-
?
- ????????Try<Resources> resources = _offeredResources.apply(operation);
- ????????if (resources.isError()) {
- ??????????drop(framework, operation, resources.error());
- ??????????continue;
- ????????}
-
?
- ????????_offeredResources = resources.get();
-
?
- ????????LOG(INFO) << "Applying CREATE operation for volumes "
- ??????????????????<< operation.create().volumes() << " from framework "
- ??????????????????<< *framework << " to slave " << *slave;
-
?
- ????????apply(framework, slave, operation);
- ????????break;
- ??????}
-
?
- ??????case Offer::Operation::DESTROY: {
- ????????Future<bool> authorization = authorizations.front();
- ????????authorizations.pop_front();
-
?
- ????????CHECK(!authorization.isDiscarded());
-
?
- ????????if (authorization.isFailed()) {
- ??????????// TODO(greggomann): We may want to retry this failed authorization
- ??????????// request rather than dropping it immediately.
- ??????????drop(framework,
- ???????????????operation,
- ???????????????"Authorization of principal ‘" + framework->info.principal() +
- ???????????????"‘ to destroy persistent volumes failed: " +
- ???????????????authorization.failure());
-
?
- ??????????continue;
- ????????} else
if (!authorization.get()) {
- ??????????drop(framework,
- ???????????????operation,
- ???????????????"Not authorized to destroy persistent volumes as ‘" +
- ?????????????????framework->info.principal() + "‘");
-
?
- ??????????continue;
- ????????}
-
?
- ????????// Make sure this destroy operation is valid.
- ????????Option<Error> error = validation::operation::validate(
- ????????????operation.destroy(), slave->checkpointedResources);
-
?
- ????????if (error.isSome()) {
- ??????????drop(framework, operation, error.get().message);
- ??????????continue;
- ????????}
-
?
- ????????Try<Resources> resources = _offeredResources.apply(operation);
- ????????if (resources.isError()) {
- ??????????drop(framework, operation, resources.error());
- ??????????continue;
- ????????}
-
?
- ????????_offeredResources = resources.get();
-
?
- ????????LOG(INFO) << "Applying DESTROY operation for volumes "
- ??????????????????<< operation.create().volumes() << " from framework "
- ??????????????????<< *framework << " to slave " << *slave;
-
?
- ????????apply(framework, slave, operation);
- ????????break;
- ??????}
-
?
- ??????case Offer::Operation::LAUNCH: {
- ????????foreach (const TaskInfo& task, operation.launch().task_infos()) {
- ??????????Future<bool> authorization = authorizations.front();
- ??????????authorizations.pop_front();
-
?
- ??????????// NOTE: The task will not be in ‘pendingTasks‘ if
- ??????????// ‘killTask()‘ for the task was called before we are here.
- ??????????// No need to launch the task if it‘s no longer pending.
- ??????????// However, we still need to check the authorization result
- ??????????// and do the validation so that we can send status update
- ??????????// in case the task has duplicated ID.
- ??????????bool pending = framework->pendingTasks.contains(task.task_id());
-
?
- ??????????// Remove from pending tasks.
- ??????????framework->pendingTasks.erase(task.task_id());
-
?
- ??????????CHECK(!authorization.isDiscarded());
-
?
- ??????????if (authorization.isFailed() || !authorization.get()) {
- ????????????string user = framework->info.user(); // Default user.
- ????????????if (task.has_command() && task.command().has_user()) {
- ??????????????user = task.command().user();
- ????????????} else
if (task.has_executor() &&
- ???????????????????????task.executor().command().has_user()) {
- ??????????????user = task.executor().command().user();
- ????????????}
-
?
- ????????????const StatusUpdate& update = protobuf::createStatusUpdate(
- ????????????????framework->id(),
- ????????????????task.slave_id(),
- ????????????????task.task_id(),
- ????????????????TASK_ERROR,
- ????????????????TaskStatus::SOURCE_MASTER,
- ????????????????None(),
- ????????????????authorization.isFailed() ?
- ????????????????????"Authorization failure: " + authorization.failure() :
- ????????????????????"Not authorized to launch as user ‘" + user + "‘",
- ????????????????TaskStatus::REASON_TASK_UNAUTHORIZED);
-
?
- ????????????metrics->tasks_error++;
-
?
- ????????????metrics->incrementTasksStates(
- ????????????????TASK_ERROR,
- ????????????????TaskStatus::SOURCE_MASTER,
- ????????????????TaskStatus::REASON_TASK_UNAUTHORIZED);
-
?
- ????????????forward(update, UPID(), framework);
-
?
- ????????????continue;
- ??????????}
-
?
- ??????????// Validate the task.
-
?
- ??????????// Make a copy of the original task so that we can
- ??????????// fill the missing `framework_id` in ExecutorInfo
- ??????????// if needed. This field was added to the API later
- ??????????// and thus was made optional.
- ??????????TaskInfo task_(task);
- ??????????if (task.has_executor() && !task.executor().has_framework_id()) {
- ????????????task_.mutable_executor()
- ????????????????->mutable_framework_id()->CopyFrom(framework->id());
- ??????????}
-
?
- ??????????const Option<Error>& validationError = validation::task::validate(
- ??????????????task_,
- ??????????????framework,
- ??????????????slave,
- ??????????????_offeredResources);
-
?
- ??????????if (validationError.isSome()) {
- ????????????const StatusUpdate& update = protobuf::createStatusUpdate(
- ????????????????framework->id(),
- ????????????????task_.slave_id(),
- ????????????????task_.task_id(),
- ????????????????TASK_ERROR,
- ????????????????TaskStatus::SOURCE_MASTER,
- ????????????????None(),
- ????????????????validationError.get().message,
- ????????????????TaskStatus::REASON_TASK_INVALID);
-
?
- ????????????metrics->tasks_error++;
-
?
- ????????????metrics->incrementTasksStates(
- ????????????????TASK_ERROR,
- ????????????????TaskStatus::SOURCE_MASTER,
- ????????????????TaskStatus::REASON_TASK_INVALID);
-
?
- ????????????forward(update, UPID(), framework);
-
?
- ????????????continue;
- ??????????}
-
?
- ??????????// Add task.
- ??????????if (pending) {
- ????????????_offeredResources -= addTask(task_, framework, slave);
-
?
- ????????????// TODO(bmahler): Consider updating this log message to
- ????????????// indicate when the executor is also being launched.
- ????????????LOG(INFO) << "Launching task " << task_.task_id()
- ??????????????????????<< " of framework " << *framework
- ??????????????????????<< " with resources " << task_.resources()
- ??????????????????????<< " on slave " << *slave;
-
?
- ????????????RunTaskMessage message;
- ????????????message.mutable_framework()->MergeFrom(framework->info);
-
?
- ????????????// TODO(anand): We set ‘pid‘ to UPID() for http frameworks
- ????????????// as ‘pid‘ was made optional in 0.24.0. In 0.25.0, we
- ????????????// no longer have to set pid here for http frameworks.
- ????????????message.set_pid(framework->pid.getOrElse(UPID()));
- ????????????message.mutable_task()->MergeFrom(task_);
-
?
- ????????????if (HookManager::hooksAvailable()) {
- ??????????????// Set labels retrieved from label-decorator hooks.
- ??????????????message.mutable_task()->mutable_labels()->CopyFrom(
- ??????????????????HookManager::masterLaunchTaskLabelDecorator(
- ??????????????????????task_,
- ??????????????????????framework->info,
- ??????????????????????slave->info));
- ????????????}
-
?
- ????????????send(slave->pid, message);
- ??????????}
- ????????}
- ????????break;
- ??????}
-
?
- ??????default:
- ????????LOG(ERROR) << "Unsupported offer operation " << operation.type();
- ????????break;
- ????}
- ??}
-
?
- ??if (!_offeredResources.empty()) {
- ????// Tell the allocator about the unused (e.g., refused) resources.
- ????allocator->recoverResources(
- ????????frameworkId,
- ????????slaveId,
- ????????_offeredResources,
- ????????accept.filters());
- ??}
- }
|