client.submit(new PutCommand("foo", "Hello world!"));
ServerContext
connection.handler(CommandRequest.class, request -> state.command(request));
State.command
ReserveState开始,会把command forward到leader,只有leader可以处理command
@Override public CompletableFuture<CommandResponse> command(CommandRequest request) { context.checkThread(); logRequest(request); if (context.getLeader() == null) { return CompletableFuture.completedFuture(logResponse(CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.NO_LEADER_ERROR) .build())); } else { return this.<CommandRequest, CommandResponse>forward(request) .exceptionally(error -> CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.NO_LEADER_ERROR) .build()) .thenApply(this::logResponse); } }
LeaderState.Command
public CompletableFuture<CommandResponse> command(final CommandRequest request) { context.checkThread(); logRequest(request); // Get the client‘s server session. If the session doesn‘t exist, return an unknown session error. ServerSessionContext session = context.getStateMachine().executor().context().sessions().getSession(request.session()); if (session == null) { //如果session不存在,无法处理该command return CompletableFuture.completedFuture(logResponse(CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.UNKNOWN_SESSION_ERROR) .build())); } ComposableFuture<CommandResponse> future = new ComposableFuture<>(); sequenceCommand(request, session, future); return future; }
sequenceCommand
/** * Sequences the given command to the log. */ private void sequenceCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) { // If the command is LINEARIZABLE and the session‘s current sequence number is less then one prior to the request // sequence number, queue this request for handling later. We want to handle command requests in the order in which // they were sent by the client. Note that it‘s possible for the session sequence number to be greater than the request // sequence number. In that case, it‘s likely that the command was submitted more than once to the // cluster, and the command will be deduplicated once applied to the state machine. if (request.sequence() > session.nextRequestSequence()) { //session中的request需要按sequence执行,所以如果request的sequence num大于我们期望的,说明这个request需要等之前的request先执行 // If the request sequence number is more than 1k requests above the last sequenced request, reject the request. // The client should resubmit a request that fails with a COMMAND_ERROR. if (request.sequence() - session.getRequestSequence() > MAX_REQUEST_QUEUE_SIZE) { //如果request的sequence大的太多,和当前sequence比,大100以上 future.complete(CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.COMMAND_ERROR) //拒绝该command .build()); } // Register the request in the request queue if it‘s not too far ahead of the current sequence number. else { session.registerRequest(request.sequence(), () -> applyCommand(request, session, future)); //放入queue等待 } } else { applyCommand(request, session, future); //apply该command } }
如果command的request比期望的大,
session.registerRequest
ServerSessionContext
ServerSessionContext registerRequest(long sequence, Runnable runnable) { commands.put(sequence, runnable); return this; }
可以看到会把sequence id和对于的function注册到commands里面,这里就是applyCommand
问题这个commands会在什么时候被触发执行,
ServerSessionContext setRequestSequence(long request) { if (request > this.requestSequence) { this.requestSequence = request; // When the request sequence number is incremented, get the next queued request callback and call it. // This will allow the command request to be evaluated in sequence. Runnable command = this.commands.remove(nextRequestSequence()); if (command != null) { command.run(); } } return this; }
在setRequestSequence的时候,
当set的时候,去commands里面看下,是否有下一个request在等待,如果有直接执行掉
applyCommand
/** * Applies the given command to the log. */ private void applyCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) { final Command command = request.command(); final long term = context.getTerm(); final long timestamp = System.currentTimeMillis(); final long index; // Create a CommandEntry and append it to the log. try (CommandEntry entry = context.getLog().create(CommandEntry.class)) { entry.setTerm(term) .setSession(request.session()) .setTimestamp(timestamp) .setSequence(request.sequence()) .setCommand(command); index = context.getLog().append(entry); //把CommandEntry写入log LOGGER.debug("{} - Appended {}", context.getCluster().member().address(), entry); } // Replicate the command to followers. appendCommand(index, future); // Set the last processed request for the session. This will cause sequential command callbacks to be executed. session.setRequestSequence(request.sequence()); //更新session的sequence,这里会试图去check session.commands是否有next request }
appendCommand
/** * Sends append requests for a command to followers. */ private void appendCommand(long index, CompletableFuture<CommandResponse> future) { appender.appendEntries(index).whenComplete((commitIndex, commitError) -> { //appendEntries到该index context.checkThread(); if (isOpen()) { if (commitError == null) { applyCommand(index, future); //如果成功,applyCommand } else { future.complete(logResponse(CommandResponse.builder() .withStatus(Response.Status.ERROR) .withError(CopycatError.Type.INTERNAL_ERROR) .build())); } } }); }
applyCommand,函数名不能换换吗
/** * Applies a command to the state machine. */ private void applyCommand(long index, CompletableFuture<CommandResponse> future) { context.getStateMachine().<ServerStateMachine.Result>apply(index).whenComplete((result, error) -> { if (isOpen()) { completeOperation(result, CommandResponse.builder(), error, future); } }); }
apply,我收到command首先要把它写到log里面,然后同步给follower,最终,需要去执行command,比如修改状态机里面的值,a=1
ServerContext.getStateMachine(),返回
private ServerStateMachine stateMachine;
这里调用ServerStateMachine.apply(index)
调用apply(entry)
调用apply((CommandEntry) entry)
private CompletableFuture<Result> apply(CommandEntry entry) { final CompletableFuture<Result> future = new CompletableFuture<>(); final ThreadContext context = ThreadContext.currentContextOrThrow(); //这里保留当前thread的引用 // First check to ensure that the session exists. ServerSessionContext session = executor.context().sessions().getSession(entry.getSession()); // If the session is null, return an UnknownSessionException. Commands applied to the state machine must // have a session. We ensure that session register/unregister entries are not compacted from the log // until all associated commands have been cleaned. if (session == null) { //session不存在 log.release(entry.getIndex()); return Futures.exceptionalFuture(new UnknownSessionException("unknown session: " + entry.getSession())); } // If the session is not in an active state, return an UnknownSessionException. Sessions are retained in the // session registry until all prior commands have been released by the state machine, but new commands can // only be applied for sessions in an active state. else if (!session.state().active()) { //session的状态非active log.release(entry.getIndex()); return Futures.exceptionalFuture(new UnknownSessionException("inactive session: " + entry.getSession())); } // If the command‘s sequence number is less than the next session sequence number then that indicates that // we‘ve received a command that was previously applied to the state machine. Ensure linearizability by // returning the cached response instead of applying it to the user defined state machine. else if (entry.getSequence() > 0 && entry.getSequence() < session.nextCommandSequence()) { //已经apply过的entry // Ensure the response check is executed in the state machine thread in order to ensure the // command was applied, otherwise there will be a race condition and concurrent modification issues. long sequence = entry.getSequence(); // Switch to the state machine thread and get the existing response. executor.executor().execute(() -> sequenceCommand(sequence, session, future, context)); //直接返回之前apply的结果 return future; } // If we‘ve made it this far, the command must have been applied in the proper order as sequenced by the // session. This should be the case for most commands applied to the state machine. else { // Allow the executor to execute any scheduled events. long index = entry.getIndex(); long sequence = entry.getSequence(); // Calculate the updated timestamp for the command. long timestamp = executor.timestamp(entry.getTimestamp()); // Execute the command in the state machine thread. Once complete, the CompletableFuture callback will be completed // in the state machine thread. Register the result in that thread and then complete the future in the caller‘s thread. ServerCommit commit = commits.acquire(entry, session, timestamp); //这里有个ServerCommitPool的实现,为了避免反复生成ServerCommit对象,直接从pool里面拿一个,用完放回去 executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context)); // Update the last applied index prior to the command sequence number. This is necessary to ensure queries sequenced // at this index receive the index of the command. setLastApplied(index); // Update the session timestamp and command sequence number. This is done in the caller‘s thread since all // timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread. session.setTimestamp(timestamp).setCommandSequence(sequence); return future; } }
executeCommand
ServerCommit commit = commits.acquire(entry, session, timestamp);executor.executor().execute(() -> executeCommand(index, sequence, timestamp, commit, session, future, context));
注意这里有两个线程,
一个是context,是
ThreadContext threadContext
用来响应server请求的
还有一个是executor里面的stateContext,用来改变stateMachine的状态的
所以这里是用executor来执行executeCommand,但把ThreadContext传入
/** * Executes a state machine command. */ private void executeCommand(long index, long sequence, long timestamp, ServerCommit commit, ServerSessionContext session, CompletableFuture<Result> future, ThreadContext context) { // Trigger scheduled callbacks in the state machine. executor.tick(index, timestamp); // Update the state machine context with the commit index and local server context. The synchronous flag // indicates whether the server expects linearizable completion of published events. Events will be published // based on the configured consistency level for the context. executor.init(commit.index(), commit.time(), ServerStateMachineContext.Type.COMMAND); // Store the event index to return in the command response. long eventIndex = session.getEventIndex(); try { // Execute the state machine operation and get the result. Object output = executor.executeOperation(commit); // Once the operation has been applied to the state machine, commit events published by the command. // The state machine context will build a composite future for events published to all sessions. executor.commit(); // Store the result for linearizability and complete the command. Result result = new Result(index, eventIndex, output); session.registerResult(sequence, result); // 缓存执行结果 context.executor().execute(() -> future.complete(result)); // complete future,表示future执行结束 } catch (Exception e) { // If an exception occurs during execution of the command, store the exception. Result result = new Result(index, eventIndex, e); session.registerResult(sequence, result); context.executor().execute(() -> future.complete(result)); } }
ServerStateMachineExecutor.tick
根据时间,去触发scheduledTasks中已经到时间的task
ServerStateMachineExecutor.init
更新state machine的context
void init(long index, Instant instant, ServerStateMachineContext.Type type) { context.update(index, instant, type); } //ServerStateMachineContext void update(long index, Instant instant, Type type) { this.index = index; this.type = type; clock.set(instant); }
ServerStateMachineExecutor.executeOperation
<T extends Operation<U>, U> U executeOperation(Commit commit) { // Get the function registered for the operation. If no function is registered, attempt to // use a global function if available. Function function = operations.get(commit.type()); //从operations找到type对应的function if (function == null) { // If no operation function was found for the class, try to find an operation function // registered with a parent class. for (Map.Entry<Class, Function> entry : operations.entrySet()) { if (entry.getKey().isAssignableFrom(commit.type())) { //如果注册的type是commit.type的父类 function = entry.getValue(); break; } } // If a parent operation function was found, store the function for future reference. if (function != null) { operations.put(commit.type(), function); } } if (function == null) { throw new IllegalStateException("unknown state machine operation: " + commit.type()); } else { // Execute the operation. If the operation return value is a Future, await the result, // otherwise immediately complete the execution future. try { return (U) function.apply(commit); //真正执行function } catch (Exception e) { throw new ApplicationException(e, "An application error occurred"); } } }
RequestSequence 和 CommandSequence有什么不同的,看看都在什么地方用了?
RequestSequence
Set
ServerStateMachine
private CompletableFuture<Void> apply(KeepAliveEntry entry) {
//…
// Update the session keep alive index for log cleaning. session.setKeepAliveIndex(entry.getIndex()).setRequestSequence(commandSequence);}
LeaderState
private void applyCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) {
//……
// Set the last processed request for the session. This will cause sequential command callbacks to be executed. session.setRequestSequence(request.sequence());}
Get
ServerSessionContext.setCommandSequence
// If the request sequence number is less than the applied sequence number, update the request // sequence number. This is necessary to ensure that if the local server is a follower that is // later elected leader, its sequences are consistent for commands. if (sequence > requestSequence) { // Only attempt to trigger command callbacks if any are registered. if (!this.commands.isEmpty()) { // For each request sequence number, a command callback completing the command submission may exist. for (long i = this.requestSequence + 1; i <= sequence; i++) { this.requestSequence = i; Runnable command = this.commands.remove(i); if (command != null) { command.run(); } } } else { this.requestSequence = sequence; } }
LeaderState
/** * Sequences the given command to the log. */ private void sequenceCommand(CommandRequest request, ServerSessionContext session, CompletableFuture<CommandResponse> future) { // If the command is LINEARIZABLE and the session‘s current sequence number is less then one prior to the request // sequence number, queue this request for handling later. We want to handle command requests in the order in which // they were sent by the client. Note that it‘s possible for the session sequence number to be greater than the request // sequence number. In that case, it‘s likely that the command was submitted more than once to the // cluster, and the command will be deduplicated once applied to the state machine. if (request.sequence() > session.nextRequestSequence()) { // If the request sequence number is more than 1k requests above the last sequenced request, reject the request. // The client should resubmit a request that fails with a COMMAND_ERROR. if (request.sequence() - session.getRequestSequence() > MAX_REQUEST_QUEUE_SIZE) {
CommandSequence
Set
ServerSessionContext.setCommandSequence
for (long i = commandSequence + 1; i <= sequence; i++) { commandSequence = i; List<Runnable> queries = this.sequenceQueries.remove(commandSequence); if (queries != null) { for (Runnable query : queries) { query.run(); } queries.clear(); queriesPool.add(queries); } }
ServerStateMachine
private CompletableFuture<Result> apply(CommandEntry entry)
// Update the session timestamp and command sequence number. This is done in the caller‘s thread since all// timestamp/index/sequence checks are done in this thread prior to executing operations on the state machine thread.session.setTimestamp(timestamp).setCommandSequence(sequence);
Get
LeaderState
sequenceLinearizableQuery, sequenceBoundedLinearizableQuery
/** * Sequences a bounded linearizable query. */ private void sequenceBoundedLinearizableQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) { // If the query‘s sequence number is greater than the session‘s current sequence number, queue the request for // handling once the state machine is caught up. if (entry.getSequence() > session.getCommandSequence()) { session.registerSequenceQuery(entry.getSequence(), () -> applyQuery(entry, future)); } else { applyQuery(entry, future); } }
PassiveState
private void sequenceQuery(QueryEntry entry, ServerSessionContext session, CompletableFuture<QueryResponse> future) { // If the query‘s sequence number is greater than the session‘s current sequence number, queue the request for // handling once the state machine is caught up. if (entry.getSequence() > session.getCommandSequence()) { session.registerSequenceQuery(entry.getSequence(), () -> indexQuery(entry, future)); } else { indexQuery(entry, future); } }