Copycat - command

client.submit(new PutCommand("foo", "Hello world!"));

ServerContext

connection.handler(CommandRequest.class, request -> state.command(request));

State.command

ReserveState开始,会把command forward到leader,只有leader可以处理command

@Override
  public CompletableFuture<CommandResponse> command(CommandRequest request) {
    context.checkThread();
    logRequest(request);

    if (context.getLeader() == null) {
      return CompletableFuture.completedFuture(logResponse(CommandResponse.builder()
        .withStatus(Response.Status.ERROR)
        .withError(CopycatError.Type.NO_LEADER_ERROR)
        .build()));
    } else {
      return this.<CommandRequest, CommandResponse>forward(request)
        .exceptionally(error -> CommandResponse.builder()
          .withStatus(Response.Status.ERROR)
          .withError(CopycatError.Type.NO_LEADER_ERROR)
          .build())
        .thenApply(this::logResponse);
    }
  }

LeaderState.Command

public CompletableFuture<CommandResponse> command(final CommandRequest request) {
    context.checkThread();
    logRequest(request);

    // Get the client‘s server session. If the session doesn‘t exist, return an unknown session error.
    ServerSessionContext session = context.getStateMachine().executor().context().sessions().getSession(request.session());
    if (session == null) { //如果session不存在,无法处理该command
      return CompletableFuture.completedFuture(logResponse(CommandResponse.builder()
        .withStatus(Response.Status.ERROR)
        .withError(CopycatError.Type.UNKNOWN_SESSION_ERROR)
        .build()));
    }

    ComposableFuture<CommandResponse> future = new ComposableFuture<>();
    sequenceCommand(request, session, future);
    return future;
  }

sequenceCommand

/**
   * Sequences the given command to the log.
   */
  private void sequenceCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
    // If the command is LINEARIZABLE and the session‘s current sequence number is less then one prior to the request
    // sequence number, queue this request for handling later. We want to handle command requests in the order in which
    // they were sent by the client. Note that it‘s possible for the session sequence number to be greater than the request
    // sequence number. In that case, it‘s likely that the command was submitted more than once to the
    // cluster, and the command will be deduplicated once applied to the state machine.
    if (request.sequence() > session.nextRequestSequence()) { //session中的request需要按sequence执行,所以如果request的sequence num大于我们期望的,说明这个request需要等之前的request先执行
      // If the request sequence number is more than 1k requests above the last sequenced request, reject the request.
      // The client should resubmit a request that fails with a COMMAND_ERROR.
      if (request.sequence() - session.getRequestSequence() > MAX_REQUEST_QUEUE_SIZE) { //如果request的sequence大的太多,和当前sequence比,大100以上
        future.complete(CommandResponse.builder()
          .withStatus(Response.Status.ERROR)
          .withError(CopycatError.Type.COMMAND_ERROR) //拒绝该command
          .build());
      }
      // Register the request in the request queue if it‘s not too far ahead of the current sequence number.
      else {
        session.registerRequest(request.sequence(), () -> applyCommand(request, session, future)); //放入queue等待
      }
    } else {
      applyCommand(request, session, future); //apply该command
    }
  }

如果command的request比期望的大,

session.registerRequest

ServerSessionContext

  ServerSessionContext registerRequest(long sequence, Runnable runnable) {
    commands.put(sequence, runnable);
    return this;
  }

可以看到会把sequence id和对于的function注册到commands里面,这里就是applyCommand

问题这个commands会在什么时候被触发执行,

ServerSessionContext setRequestSequence(long request) {
    if (request > this.requestSequence) {
      this.requestSequence = request;

      // When the request sequence number is incremented, get the next queued request callback and call it.
      // This will allow the command request to be evaluated in sequence.
      Runnable command = this.commands.remove(nextRequestSequence());
      if (command != null) {
        command.run();
      }
    }
    return this;
  }

在setRequestSequence的时候,

当set的时候,去commands里面看下,是否有下一个request在等待,如果有直接执行掉

applyCommand

/**
   * Applies the given command to the log.
   */
  private void applyCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
    final Command command = request.command();

    final long term = context.getTerm();
    final long timestamp = System.currentTimeMillis();
    final long index;

    // Create a CommandEntry and append it to the log.
    try (CommandEntry entry = context.getLog().create(CommandEntry.class)) {
      entry.setTerm(term)
        .setSession(request.session())
        .setTimestamp(timestamp)
        .setSequence(request.sequence())
        .setCommand(command);
      index = context.getLog().append(entry); //把CommandEntry写入log
      LOGGER.debug("{} - Appended {}", context.getCluster().member().address(), entry);
    }

    // Replicate the command to followers.
    appendCommand(index, future);

    // Set the last processed request for the session. This will cause sequential command callbacks to be executed.
    session.setRequestSequence(request.sequence()); //更新session的sequence,这里会试图去check session.commands是否有next request
  }

appendCommand

/**
   * Sends append requests for a command to followers.
   */
  private void appendCommand(long index, CompletableFuture<CommandResponse> future) {
    appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries到该index
      context.checkThread();
      if (isOpen()) {
        if (commitError == null) {
          applyCommand(index, future); //如果成功,applyCommand
        } else {
          future.complete(logResponse(CommandResponse.builder()
            .withStatus(Response.Status.ERROR)
            .withError(CopycatError.Type.INTERNAL_ERROR)
            .build()));
        }
      }
    });
  }

applyCommand,函数名不能换换吗

  /**
   * Applies a command to the state machine.
   */
  private void applyCommand(long index, CompletableFuture<CommandResponse> future) {
    context.getStateMachine().<ServerStateMachine.Result>apply(index).whenComplete((result, error) -> {
      if (isOpen()) {
        completeOperation(result, CommandResponse.builder(), error, future);
      }
    });
  }

apply,我收到command首先要把它写到log里面,然后同步给follower,最终,需要去执行command,比如修改状态机里面的值,a=1

ServerContext.getStateMachine(),返回

private ServerStateMachine stateMachine;
 
这里调用ServerStateMachine.apply(index)
调用apply(entry)
调用apply((CommandEntry) entry)
private CompletableFuture<Result> apply(CommandEntry entry) {
    final CompletableFuture<Result> future = new CompletableFuture<>();
    final ThreadContext context = ThreadContext.currentContextOrThrow(); //这里保留当前thread的引用

    // First check to ensure that the session exists.
    ServerSessionContext session = executor.context().sessions().getSession(entry.getSession());

    // If the session is null, return an UnknownSessionException. Commands applied to the state machine must
    // have a session. We ensure that session register/unregister entries are not compacted from the log
    // until all associated commands have been cleaned.
    if (session == null) { //session不存在
      log.release(entry.getIndex());
      return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.getSession()));
    }
    // If the session is not in an active state, return an UnknownSessionException. Sessions are retained in the
    // session registry until all prior commands have been released by the state machine, but new commands can
    // only be applied for sessions in an active state.
    else if (!session.state().active()) { //session的状态非active
      log.release(entry.getIndex());
      return Futures.exceptionalFuture(new UnknownSessionException("inactive session: " + entry.getSession()));
    }
    // If the command‘s sequence number is less than the next session sequence number then that indicates that
    // we‘ve received a command that was previously applied to the state machine. Ensure linearizability by
    // returning the cached response instead of applying it to the user defined state machine.
    else if (entry.getSequence() > 0 && entry.getSequence() < session.nextCommandSequence()) { //已经apply过的entry
      // Ensure the response check is executed in the state machine thread in order to ensure the
      // command was applied, otherwise there will be a race condition and concurrent modification issues.
      long sequence = entry.getSequence();

      // Switch to the state machine thread and get the existing response.
      executor.executor().execute(() -> sequenceCommand(sequence, session, future, context)); //直接返回之前apply的结果
      return future;
    }
    // If we‘ve made it this far, the command must have been applied in the proper order as sequenced by the
    // session. This should be the case for most commands applied to the state machine.
    else {
      // Allow the executor to execute any scheduled events.
      long index = entry.getIndex();
      long sequence = entry.getSequence();

      // Calculate the updated timestamp for the command.
      long timestamp = executor.timestamp(entry.getTimestamp());

      // Execute the command in the state machine thread. Once complete, the CompletableFuture callback will be completed
      // in the state machine thread. Register the result in that thread and then complete the future in the caller‘s thread.
      ServerCommit commit = commits.acquire(entry, session, timestamp); //这里有个ServerCommitPool的实现,为了避免反复生成ServerCommit对象,直接从pool里面拿一个,用完放回去
      executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));

      // Update the last applied index prior to the command sequence number. This is necessary to ensure queries sequenced
      // at this index receive the index of the command.
      setLastApplied(index);

      // Update the session timestamp and command sequence number. This is done in the caller‘s thread since all
      // timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread.
      session.setTimestamp(timestamp).setCommandSequence(sequence);
      return future;
    }
  }
executeCommand
ServerCommit commit = commits.acquire(entry, session, timestamp);executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));

注意这里有两个线程,

一个是context,是

ThreadContext threadContext

用来响应server请求的

还有一个是executor里面的stateContext,用来改变stateMachine的状态的

所以这里是用executor来执行executeCommand,但把ThreadContext传入

/**
   * Executes a state machine command.
   */
  private void executeCommand(long index, long sequence, long timestamp, ServerCommit commit, ServerSessionContext session, CompletableFuture<Result> future, ThreadContext context) {

    // Trigger scheduled callbacks in the state machine.
    executor.tick(index, timestamp);

    // Update the state machine context with the commit index and local server context. The synchronous flag
    // indicates whether the server expects linearizable completion of published events. Events will be published
    // based on the configured consistency level for the context.
    executor.init(commit.index(), commit.time(), ServerStateMachineContext.Type.COMMAND);

    // Store the event index to return in the command response.
    long eventIndex = session.getEventIndex();

    try {
      // Execute the state machine operation and get the result.
      Object output = executor.executeOperation(commit);

      // Once the operation has been applied to the state machine, commit events published by the command.
      // The state machine context will build a composite future for events published to all sessions.
      executor.commit();

      // Store the result for linearizability and complete the command.
      Result result = new Result(index, eventIndex, output);
      session.registerResult(sequence, result); // 缓存执行结果
      context.executor().execute(() -> future.complete(result)); // complete future,表示future执行结束
    } catch (Exception e) {
      // If an exception occurs during execution of the command, store the exception.
      Result result = new Result(index, eventIndex, e);
      session.registerResult(sequence, result);
      context.executor().execute(() -> future.complete(result));
    }
  }
ServerStateMachineExecutor.tick
根据时间,去触发scheduledTasks中已经到时间的task
 
ServerStateMachineExecutor.init
更新state machine的context
void init(long index, Instant instant, ServerStateMachineContext.Type type) {
    context.update(index, instant, type);
  }

  //ServerStateMachineContext
  void update(long index, Instant instant, Type type) {
    this.index = index;
    this.type = type;
    clock.set(instant);
  }
 
ServerStateMachineExecutor.executeOperation
<T extends Operation<U>, U> U executeOperation(Commit commit) {

    // Get the function registered for the operation. If no function is registered, attempt to
    // use a global function if available.
    Function function = operations.get(commit.type()); //从operations找到type对应的function

    if (function == null) {
      // If no operation function was found for the class, try to find an operation function
      // registered with a parent class.
      for (Map.Entry<Class, Function> entry : operations.entrySet()) {
        if (entry.getKey().isAssignableFrom(commit.type())) { //如果注册的type是commit.type的父类
          function = entry.getValue();
          break;
        }
      }

      // If a parent operation function was found, store the function for future reference.
      if (function != null) {
        operations.put(commit.type(), function);
      }
    }

    if (function == null) {
      throw new IllegalStateException("unknown state machine operation: " + commit.type());
    } else {
      // Execute the operation. If the operation return value is a Future, await the result,
      // otherwise immediately complete the execution future.
      try {
        return (U) function.apply(commit); //真正执行function
      } catch (Exception e) {
        throw new ApplicationException(e, "An application error occurred");
      }
    }
  }
 
 
RequestSequence 和 CommandSequence有什么不同的,看看都在什么地方用了?
 

RequestSequence

Set

ServerStateMachine

private CompletableFuture<Void> apply(KeepAliveEntry entry) {
//…
  // Update the session keep alive index for log cleaning.  session.setKeepAliveIndex(entry.getIndex()).setRequestSequence(commandSequence);}

LeaderState

private void applyCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
//……
  // Set the last processed request for the session. This will cause sequential command callbacks to be executed.  session.setRequestSequence(request.sequence());}
 

Get

ServerSessionContext.setCommandSequence
// If the request sequence number is less than the applied sequence number, update the request
    // sequence number. This is necessary to ensure that if the local server is a follower that is
    // later elected leader, its sequences are consistent for commands.
    if (sequence > requestSequence) {
      // Only attempt to trigger command callbacks if any are registered.
      if (!this.commands.isEmpty()) {
        // For each request sequence number, a command callback completing the command submission may exist.
        for (long i = this.requestSequence + 1; i <= sequence; i++) {
          this.requestSequence = i;
          Runnable command = this.commands.remove(i);
          if (command != null) {
            command.run();
          }
        }
      } else {
        this.requestSequence = sequence;
      }
    }

LeaderState

/**
   * Sequences the given command to the log.
   */
  private void sequenceCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
    // If the command is LINEARIZABLE and the session‘s current sequence number is less then one prior to the request
    // sequence number, queue this request for handling later. We want to handle command requests in the order in which
    // they were sent by the client. Note that it‘s possible for the session sequence number to be greater than the request
    // sequence number. In that case, it‘s likely that the command was submitted more than once to the
    // cluster, and the command will be deduplicated once applied to the state machine.
    if (request.sequence() > session.nextRequestSequence()) {
      // If the request sequence number is more than 1k requests above the last sequenced request, reject the request.
      // The client should resubmit a request that fails with a COMMAND_ERROR.
      if (request.sequence() - session.getRequestSequence() > MAX_REQUEST_QUEUE_SIZE) {

CommandSequence

Set

ServerSessionContext.setCommandSequence

for (long i = commandSequence + 1; i <= sequence; i++) {
      commandSequence = i;
      List<Runnable> queries = this.sequenceQueries.remove(commandSequence);
      if (queries != null) {
        for (Runnable query : queries) {
          query.run();
        }
        queries.clear();
        queriesPool.add(queries);
      }
    }

ServerStateMachine

private CompletableFuture<Result> apply(CommandEntry entry)
// Update the session timestamp and command sequence number. This is done in the caller‘s thread since all// timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread.session.setTimestamp(timestamp).setCommandSequence(sequence);

Get

LeaderState

sequenceLinearizableQuery, sequenceBoundedLinearizableQuery
/**
   * Sequences a bounded linearizable query.
   */
  private void sequenceBoundedLinearizableQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) {
    // If the query‘s sequence number is greater than the session‘s current sequence number, queue the request for
    // handling once the state machine is caught up.
    if (entry.getSequence() > session.getCommandSequence()) {
      session.registerSequenceQuery(entry.getSequence(), () -> applyQuery(entry, future));
    } else {
      applyQuery(entry, future);
    }
  }

PassiveState

private void sequenceQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) {
    // If the query‘s sequence number is greater than the session‘s current sequence number, queue the request for
    // handling once the state machine is caught up.
    if (entry.getSequence() > session.getCommandSequence()) {
      session.registerSequenceQuery(entry.getSequence(), () -> indexQuery(entry, future));
    } else {
      indexQuery(entry, future);
    }
  }
时间: 2024-12-25 01:25:32

Copycat - command的相关文章

Copycat - MemberShip

https://github.com/atomix/copycat   http://atomix.io/copycat/docs/membership/   为了便于实现,Copycat把member分成3种, active, passive, and reserve members - each of which play some role in supporting rapid replacement of failed servers.   Active members are ful

Copycat - Overview

Copycat's primary role is as a framework for building highly consistent, fault-tolerant replicated state machines. Copycat servers receive state machine operations from clients, log and replicate the operations as necessary, and apply them to a state

MISP版本嵌入式QT编译时出现mips-linux-gcc command not found

configure的时候都没什么问题我的configure是:./configure -prefix /opt/qt-jz -xplatform qws/linux-mips-g++ -embedded mips  configure顺利的过了,但是当make的时候,出现了mips-linux-gcc :Commond not fount! 我装的是mipsl-linux-gcc ,而且已经交叉编译过了个hello world了.. make时出现的错误是:make[1]: Entering d

quagga源码分析--通用库command

quagga是一个完整又成熟的系统,作为一个路由器软件,自然要提供人机接口. quagga提供snmp管理接口,而且,自然就会有对应的命令行管理格式,当然一般路由软件不会提供界面形式的,也许有webui,然而quagga并没有. 我们要看的就是这个命令行处理的代码 command. 接触过类似命令行的朋友肯定有一点点好奇吧,那么数量庞大的命令和参数输入,还可以提供提示和自动补齐,这肯定不是一件很简单的事情. 下面是一个配置示例: 1 ! 2 interface bge0 3 ip ospf au

加载语音license command

FUYI#copytftp flash0: Addressor name of remote host []? 1.1.1.1 Sourcefilename []? FGL162212WS_201606290212473480.lic Destinationfilename [FGL162212WS_201606290212473480.lic]? Accessingtftp://1.1.1.1/FGL162212WS_201606290212473480.lic... LoadingFGL16

Linux bash: scp: command not found的问题记录

1,scp报错 [[email protected] soft]# scpjdk-7u55-linux-x64.tar.gz 192.168.121.246:/soft/ [email protected]'s password: bash: scp: command not found lost connection [[email protected] soft]# 2,调试-v [[email protected] soft]# scp -vjdk-7u55-linux-x64.tar.g

mv command:unable to remove target: Is a director

mv command:unable to remove target: Is a director This is somewhat simple as long as we understand the concept. mv or move does not actually move the file/folder to another location within the same device, it merely replaces the pointer in the first

CentOS 中无法使用make,make install 命令 make: command not

CentOS 中无法使用make,make install 命令 make: command not found 1.安装make yum -y install gcc automake autoconf libtool make 2.安装gcc yum install gcc gcc-c++

Understanding the Top command on Linux

Article by AlexioBash published on his website about ArchLinux in italian. Know what is happening in "real time" on your systems is in my opinion the basis to use and optimize your OS. On ArchLinux or better on GNU/Linux in general thetopcommand