Zookeeper源码阅读(十八) 选举之快速选举算法FastLeaderElection

目录

  • 前言
  • FastLeaderEleaction基本结构
  • 选举方法分析
  • 思考
  • 参考

前言

在过去的两节里已经分析了选举过程中的一些实体类和网络IO相关的机制与源码,这一节将会对zookeeper选举的核心类FastLeaderElection进行分析。

FastLeaderEleaction基本结构

可以看到FastLeaderElection的基本结构还是比较清晰的,主要从新的成员变量类和内部类来分析下FastLeaderElection的基本结构。

Notification

/**
 * Notifications are messages that let other peers know that
 * a given peer has changed its vote, either because it has
 * joined leader election or because it learned of another
 * peer with higher zxid or same zxid and higher server id
 */

static public class Notification {

从Notification的注释我们能看到,它的目的就是通知其他peer修改了选票。从Notification的成员变量可以看,Notification基本和Vote类一致。但是在Notification类里有一个version用来标记当前Notification的version,可能是为了用来做不同版本zk之间通信来做一些逻辑处理,这部分目前没看到有什么实际的使用。

ToSend

ToSend主体和Vote类也一致,但是ToSend类多了一个sid,用来判断发给哪个server,为了要包装这样一个类,我的想法是方便在FastLeaderElection处理业务逻辑的便利。

Messenger

从代码结构中可以看到,Messenger主要分为WorkerReceiver和WorkerSender两个子类。

WorkerReceiver
/**
 * Receives messages from instance of QuorumCnxManager on
 * method run(), and processes such messages.
 */

从注释可以看到,WorkerReceiver的目的就是为了从peer接收消息并进行处理的。Workerreceiver继承了ZookeeperThread,所以也是一个单独的处理任务的线程。它的run方法代码比较长,从参考2里取了一张流程图来表示,并对关键部分解析一下。

可以看到,receiver在收到消息后,会去判断是否是observer发来的消息,如果是observer直接给它同步就可以了,如果是非observer的peer,就去看自己的状态是否是LOOKING,如果自己是LOOING,且对方的选举周期比自己小,那么就给对方同步自己的提议;如果自己不是LOOKING,但对方是LOOKING,那么就把之前的投票结果发给对方。

/*
 * If it is from an observer, respond right away.
 * Note that the following predicate assumes that
 * if a server is not a follower, then it must be
 * an observer. If we ever have any other type of
 * learner in the future, we'll have to change the
 * way we check for observers.
 */
if(!self.getVotingView().containsKey(response.sid)){//votingview是有投票资格的peer列表,没在列表里代表是observer
    Vote current = self.getCurrentVote();
    ToSend notmsg = new ToSend(ToSend.mType.notification,
            current.getId(),
            current.getZxid(),
            logicalclock.get(),
            self.getPeerState(),
            response.sid,
            current.getPeerEpoch());

    sendqueue.offer(notmsg);
}

可以看到,如果是observer的消息,那么直接生成一条notification类型的信息发送给对应的peer就可以。

/*
 * If this server is looking, then send proposed leader
 */

if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){//如果自己是LOOKING状态
    recvqueue.offer(n);//把消息放入recvqueue

    /*
     * Send a notification back if the peer that sent this
     * message is also looking and its logical clock is
     * lagging behind.
     */
    if((ackstate == QuorumPeer.ServerState.LOOKING)//发送方也是LOOKING
            && (n.electionEpoch < logicalclock.get())){
        Vote v = getVote();
        ToSend notmsg = new ToSend(ToSend.mType.notification,
                v.getId(),
                v.getZxid(),
                logicalclock.get(),
                self.getPeerState(),
                response.sid,
                v.getPeerEpoch());
        sendqueue.offer(notmsg);//发送自己的Vote给对方
    }
} else {
    /*
     * If this server is not looking, but the one that sent the ack
     * is looking, then send back what it believes to be the leader.
     */
    Vote current = self.getCurrentVote();//如果自己不是LOOKING,那么就生成自己认为的当前的Vote情况
    if(ackstate == QuorumPeer.ServerState.LOOKING){
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending new notification. My id =  " +
                    self.getId() + " recipient=" +
                    response.sid + " zxid=0x" +
                    Long.toHexString(current.getZxid()) +
                    " leader=" + current.getId());
        }

        ToSend notmsg;
        if(n.version > 0x0) {//这里根据version生成不同的消息,但是version具体的作用还是不太清除
            notmsg = new ToSend(
                    ToSend.mType.notification,
                    current.getId(),
                    current.getZxid(),
                    current.getElectionEpoch(),
                    self.getPeerState(),
                    response.sid,
                    current.getPeerEpoch());

        } else {
            Vote bcVote = self.getBCVote();
            notmsg = new ToSend(
                    ToSend.mType.notification,
                    bcVote.getId(),
                    bcVote.getZxid(),
                    bcVote.getElectionEpoch(),
                    self.getPeerState(),
                    response.sid,
                    bcVote.getPeerEpoch());
        }
        sendqueue.offer(notmsg);//把要发送的消息放入sendqueue
    }
}

这里逻辑蛮清晰的,但是有一点要强调一下,FastLeaderElection中也有收发队列,上一篇讲的网络IO里也有收发队列,他们是怎么配合工作的呢。看下WorkerReceiver的run方法的开头就可以看到

response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);

这样就很清晰了,FastLeaderElection的WorkerReceiver里的网络IO的receiver从IO的队列中取出,然后放到FastLeaderElection的接收队列中。这就是一个两层队列的关系,IO中的队列专门用来处理底层byte的处理及一些基础逻辑,然后设计到算法的逻辑在FastLeaderElection的中处理,并在FastLeaderElection的队列中生产消费。简单点说就是FastLeaderElection的队列是以来网络IO的队列的。

WorkerSender

WorkerSender的逻辑就比较简单了。

public void run() {
    while (!stop) {
        try {
            ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);//从发送队列中取出
            if(m == null) continue;

            process(m);//放到网络io的放松队列中
        } catch (InterruptedException e) {
            break;
        }
    }
    LOG.info("WorkerSender is down");
}

选举方法分析

FastLeaderElection的主要选举逻辑在lookForLeader方法里,先通过分析lookForLeader来看下选举的主要流程。这里面有许多还没有分析的方法,可以先看大致的逻辑,然后针对具体的方法进行分析。

/**
 * Starts a new round of leader election. Whenever our QuorumPeer
 * changes its state to LOOKING, this method is invoked, and it
 * sends notifications to all other peers.
 */
public Vote lookForLeader() throws InterruptedException {
    try {
        self.jmxLeaderElectionBean = new LeaderElectionBean();
        MBeanRegistry.getInstance().register(//注册JMX监控
                self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        self.jmxLeaderElectionBean = null;
    }
    if (self.start_fle == 0) {
       self.start_fle = Time.currentElapsedTime();//初始化选举时间
    }
    try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();//recvset是本轮选举收到的选票集合,按sid分

        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();//FOLLOWING和LEADING的peer发来的选票

        int notTimeout = finalizeWait;

        synchronized(this){
            logicalclock.incrementAndGet();//增加选举轮次
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());//初始化选票,投自己
        }

        LOG.info("New election. My id =  " + self.getId() +
                ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        sendNotifications();//给每个peer发送自己的提议

        /*
         * Loop in which we exchange notifications until we find a leader
         */

        while ((self.getPeerState() == ServerState.LOOKING) &&
                (!stop)){//交互的选举过程开始
            /*
             * Remove next notification from queue, times out after 2 times
             * the termination time
             */
            Notification n = recvqueue.poll(notTimeout,
                    TimeUnit.MILLISECONDS);//从接收队列中取出通知

            /*
             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            if(n == null){//这时选举还没有结束,而接收队列一个通知都没有,就代表需要去连接peer主动获取他们的vote信息
                if(manager.haveDelivered()){//如果已经发送过消息,即所有sid对应的发送队列都空了
                    sendNotifications();//重发一遍
                } else {
                    manager.connectAll();//如果发送队列里还有消息,代表可能连接断开了,那就重连一次
                }

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);//这里从接收队列取出通知的等待时间是会加长的,
                LOG.info("Notification time out: " + notTimeout);
            }
            else if(self.getVotingView().containsKey(n.sid)) {//是其他peer发来的消息
                /*
                 * Only proceed if the vote comes from a replica in the
                 * voting view.
                 */
                switch (n.state) {
                case LOOKING://是looking状态的
                    // If notification > current, replace and send messages out
                    if (n.electionEpoch > logicalclock.get()) {//对方的epoch比自己高
                        logicalclock.set(n.electionEpoch);//更新自己的logicalLock
                        recvset.clear();//清除之前收到的选票(这些选票一定是跟自己更新前的logicalClock一个epoch,不然之前就被清掉了)
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {//对方的选票比自己的本身的初始vote要“好”
                            updateProposal(n.leader, n.zxid, n.peerEpoch);//换成自己的选票
                        } else {
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());//不然把自己现在的选票发给对方
                        }
                        sendNotifications();//给大家更新一份自己的选票
                    } else if (n.electionEpoch < logicalclock.get()) {//对方epoch比自己低就不用管,等对方收到peer的通知就知道了,对方自己回去走上面的逻辑去更新
                        if(LOG.isDebugEnabled()){
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                    + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {//如果epoch相同,那么就比较自己当前的vote和对方的vote信息,对方比自己“好”
                        updateProposal(n.leader, n.zxid, n.peerEpoch);//更新提议
                        sendNotifications();//发送通知
                    }

                    if(LOG.isDebugEnabled()){
                        LOG.debug("Adding vote: from=" + n.sid +
                                ", proposed leader=" + n.leader +
                                ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }

                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));//更新接受的选票集合

                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {//如果自己投的票的sid被选为leader

                        // Verify if there is any change in the proposed leader
                        while((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null){//接收队列还有消息
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)){//接收队列新的消息比自己投的还要好(注意,这时候已经认为自己投的是leader了)
                                recvqueue.put(n);//再把消息放进接收队列,为啥这样做?我的想法是因为因为有网络的延迟,所以可能出现一种情况就是比如集群里有一台机器的选票没有发过来,但是它的选票是最优的,在其他的完成选举后,它的选票发来了,但是这时候当前server的状态还没有改掉,于是就把这个选票再放回去,下次取出来的时候就在switch的其他逻辑里处理了
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {//当前接收队列里没有其他逻辑了
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING: learningState());//设置自己的状态

                            Vote endVote = new Vote(proposedLeader,
                                                    proposedZxid,
                                                    logicalclock.get(),
                                                    proposedEpoch);
                            leaveInstance(endVote);//清空接收队列
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if(n.electionEpoch == logicalclock.get()){//选举周期是同一个
                        recvset.put(n.sid, new Vote(n.leader,
                                                      n.zxid,
                                                      n.electionEpoch,
                                                      n.peerEpoch));//

                      if(ooePredicate(recvset, outofelection, n)) {//判断选举是否成功,即leader是否选出即leader是否认为自己是leader(leader可能失效)
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());//设置状态

                            Vote endVote = new Vote(n.leader,
                                    n.zxid,
                                    n.electionEpoch,
                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify
                     * a majority is following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version,
                                                        n.leader,
                                                        n.zxid,
                                                        n.electionEpoch,
                                                        n.peerEpoch,
                                                        n.state));//更新follower或leader的选票集合

                    if(ooePredicate(outofelection, outofelection, n)) {
                        synchronized(this){
                            logicalclock.set(n.electionEpoch);//更新epoch
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                        }
                        Vote endVote = new Vote(n.leader,
                                                n.zxid,
                                                n.electionEpoch,
                                                n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                            n.state, n.sid);
                    break;
                }
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        try {
            if(self.jmxLeaderElectionBean != null){
                MBeanRegistry.getInstance().unregister(
                        self.jmxLeaderElectionBean);//卸载jmx
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;//帮助GC
        LOG.debug("Number of connection processing threads: {}",
                manager.getConnectionThreadCount());
    }
}

大致的逻辑是这样,在网上看资料的时候看到了两张图讲的蛮好的,贴在这里,可以按照这个逻辑再看一下。

其实无论是从流程图还是代码都可以看到,如果进行了一些更新之后发现没有达到ooePredicate的要求,也就是说支持某一个sid的选票没有过半或者选举出来的leader不合格(epoch不对或者状态不对等),那么server自己的状态不会修改,这样在下一次循环里又会重新连接其他server或者重新接受选票进行选举。

选举的逻辑在代码分析里已经讲的比较详细了,再把里面具体的方法过一下。

updateProposal
synchronized void updateProposal(long leader, long zxid, long epoch){
    if(LOG.isDebugEnabled()){
        LOG.debug("Updating proposal: " + leader + " (newleader), 0x"
                + Long.toHexString(zxid) + " (newzxid), " + proposedLeader
                + " (oldleader), 0x" + Long.toHexString(proposedZxid) + " (oldzxid)");
    }
    proposedLeader = leader;
    proposedZxid = zxid;
    proposedEpoch = epoch;
}

很简单,把自己propose的leader信息更新。

getInitId(), getInitLastLoggedZxid(), getPeerEpoch()
private long getInitId(){
    if(self.getLearnerType() == LearnerType.PARTICIPANT)
        return self.getId();
    else return Long.MIN_VALUE;
}

返回自己的sid。

private long getInitLastLoggedZxid(){
    if(self.getLearnerType() == LearnerType.PARTICIPANT)
        return self.getLastLoggedZxid();
    else return Long.MIN_VALUE;
}

返回自己最大的zxid。

private long getPeerEpoch(){
    if(self.getLearnerType() == LearnerType.PARTICIPANT)
       try {
          return self.getCurrentEpoch();
       } catch(IOException e) {
          RuntimeException re = new RuntimeException(e.getMessage());
          re.setStackTrace(e.getStackTrace());
          throw re;
       }
    else return Long.MIN_VALUE;
}

逻辑同样很简单。获取到自己的epoch。

totalOrderPredicate

正如在lookForLeader中看的,这个方法是用来比较选票的优劣的。

/**
 * Check if a pair (server id, zxid) succeeds our
 * current vote.
 *
 * @param id    Server identifier
 * @param zxid  Last zxid observed by the issuer of this vote
 */
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
    LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
            Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
    if(self.getQuorumVerifier().getWeight(newId) == 0){
        return false;
    }

    /*
     * We return true if one of the following three cases hold:
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */

    return ((newEpoch > curEpoch) ||
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

如注释所言,三种情况,1. epoch高;2. epoch一样zxid大;3. epoch和zxid都一样,sid大。

sendNotification
/**
 * Send notifications to all peers upon a change in our vote
 */
private void sendNotifications() {
    for (QuorumServer server : self.getVotingView().values()) {//遍历peer
        long sid = server.id;

        ToSend notmsg = new ToSend(ToSend.mType.notification,
                proposedLeader,
                proposedZxid,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch);
        if(LOG.isDebugEnabled()){
            LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                  Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                  " (n.round), " + sid + " (recipient), " + self.getId() +
                  " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
        }
        sendqueue.offer(notmsg);//给每个peer发送自己的vote信息
    }
}
termPredicate
/**
 * Termination predicate. Given a set of votes, determines if
 * have sufficient to declare the end of the election round.
 *
 *  @param votes    Set of votes
 *  @param l        Identifier of the vote received last
 *  @param zxid     zxid of the the vote received last
 */
protected boolean termPredicate(
        HashMap<Long, Vote> votes,
        Vote vote) {

    HashSet<Long> set = new HashSet<Long>();

    /*
     * First make the views consistent. Sometimes peers will have
     * different zxids for a server depending on timing.
     */
    for (Map.Entry<Long,Vote> entry : votes.entrySet()) {//看所有选票里投的sid和vote一致的
        if (vote.equals(entry.getValue())){
            set.add(entry.getKey());
        }
    }

    return self.getQuorumVerifier().containsQuorum(set);//用校验器校验
}

termPredicate是用来判断vote是否是选出的leader选票的。

ooePredicate,checkLeader
/**
 * In the case there is a leader elected, and a quorum supporting
 * this leader, we have to check if the leader has voted and acked
 * that it is leading. We need this check to avoid that peers keep
 * electing over and over a peer that has crashed and it is no
 * longer leading.
 *
 * @param votes set of votes
 * @param   leader  leader id
 * @param   electionEpoch   epoch id
 */
protected boolean checkLeader(
        HashMap<Long, Vote> votes,
        long leader,
        long electionEpoch){

    boolean predicate = true;

    /*
     * If everyone else thinks I'm the leader, I must be the leader.
     * The other two checks are just for the case in which I'm not the
     * leader. If I'm not the leader and I haven't received a message
     * from leader stating that it is leading, then predicate is false.
     */

    if(leader != self.getId()){//自己不是leader
        if(votes.get(leader) == null) predicate = false;//如果leader投过票
        else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;//或者leader的状态不是leading,那么认为这个投票是无效的(如注释,只要我自己不是leader,且我没有收到leader给我发它是leader,那么就不行)
    } else if(logicalclock.get() != electionEpoch) {//我自己是leader,但是epoch不对,说明我曾经挂过
        predicate = false;
    } 

    return predicate;
}
/**
 * This predicate checks that a leader has been elected. It doesn't
 * make a lot of sense without context (check lookForLeader) and it
 * has been separated for testing purposes.
 *
 * @param recv  map of received votes
 * @param ooe   map containing out of election votes (LEADING or FOLLOWING)
 * @param n     Notification
 * @return
 */
protected boolean ooePredicate(HashMap<Long,Vote> recv,
                                HashMap<Long,Vote> ooe,
                                Notification n) {

    return (termPredicate(recv, new Vote(n.version,
                                         n.leader,
                                         n.zxid,
                                         n.electionEpoch,
                                         n.peerEpoch,
                                         n.state))
            && checkLeader(ooe, n.leader, n.electionEpoch));//过半的校验加上leader的检查

}

到这里差不多选举的算法代码部分逻辑就清晰了,主要的部分和在zab思考那一节里讲的一致,但是实现工程代码还是多考虑到了很多网络丢失或者别的情况带来的一些异常,逻辑还是比较复杂的。

思考

  1. 在fastleaderelection中,有一个currentVote变量保存的是从第一轮到现在为止自己决定的最终的投票,一般是用来做通知,而在动态地投票过程中,临时生成的投票都是用getVote new出来的,并不会使用currentVote。
  2. 有一段代码逻辑值得仔细去思考

while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){//接收队列还有消息
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){//接收队列新的消息比自己投的还要好(注意,这时候已经认为自己投的是leader了)
recvqueue.put(n);//再把消息放进接收队列,为啥这样做?我的想法是因为因为有网络的延迟,所以可能出现一种情况就是比如集群里有一台机器的选票没有发过来,但是它的选票是最优的,在其他的完成选举后,它的选票发来了,但是这时候当前server的状态还没有改掉,于是就把这个选票再放回去,下次取出来的时候就在switch的其他逻辑里处理了

这种异常情况具体描述就是ABCDE五台server都进行选举,它们的epoch和zxid相互网络一切正常,A在lookForLeader的looking状态处理时发现选举了D,然后这个时候E发送的消息来到了queue里,这时A去检查queue里发现这个投票居然比选出来的leader还要好,但是leader已经选出来了不能改了,于是就放回去,下一轮循坏在处理,因为下一次来的时候自己已经是following的状态了,在switch的following处理逻辑里,下一次这个选票其实啥逻辑都不会走,会变成一张"废票"。这种延迟的策略还是比较机智的。

  1. 选举的检查

其实主要就是过半检查和leader的有效性检查。

上面说的推迟一轮之后,那张选票为啥会变成废票呢,就是因为过不了选举的检查策略。

参考

  1. 从paxos到zookeeper
  2. https://shift-alt-ctrl.iteye.com/blog/1846562
  3. https://www.jianshu.com/p/3b295d7eccf2
  4. https://www.cnblogs.com/leesf456/p/6107600.html
  5. https://www.cnblogs.com/leesf456/p/6508185.html

原文地址:https://www.cnblogs.com/gongcomeon/p/11073608.html

时间: 2024-11-08 20:40:24

Zookeeper源码阅读(十八) 选举之快速选举算法FastLeaderElection的相关文章

Zookeeper源码阅读(十四) 单机Server

前言 前面两篇主要说了下client-server的session相关的内容,到这里client的内容以及client-server的连接的内容也就基本告一段落了,剩下的部分就是server部分内部的结构,zk的选举以及server部分的工作机制等了. 这一篇主要说下单机server的启动过程,里面会涉及到一些server内部的工作机制和机构. Server架构 可以看到Zookeeper的server端主要分为几个大模块,ZKDatabase是zk server内部的内存数据库,内部维护了节点

Zookeeper源码阅读(十二) Seesion(1)

前言 前面三篇主要从client的角度说了下client和server建立连接的过程,这一篇和后面一篇开始看下Zookeeper中非常重要的一个概念:Session,session是zookeeper client和server建立和维护连接的单位(我这个描述感觉有点奇怪 ?? ). Session状态 Zookeeper的所有操作基本都是基于session的,如之前提到的wathcer的机制,客户端请求的顺序执行和临时节点的生命周期. 从我们使用API的角度,session的连接和保持就是客户

Zookeeper源码阅读(五) ACL基础

前言 之前看代码的时候也同步看了看一些关于zk源码的博客,有一两篇讲到了ZK里ACL的基础的结构,我自己这边也看了看相关的代码,在这里分享一下! ACL和ID ACL和ID都是有Jute生成的实体类,分别代表了ZK里ACL和不同ACL模式下的具体实体. ACL: public class ACL implements Record { private int perms; private org.apache.zookeeper.data.Id id; 可以看到,ACL包含了两个域,分别代表了权

做一个合格的程序猿之浅析Spring AOP源码(十八) Spring AOP开发大作战源码解析

其实上一篇文章价值很小,也有重复造轮子的嫌疑,网上AOP的实例很多,不胜枚举,其实我要说的并不是这个,我想要说的就是上一节中spring的配置文件: 我们这边并没有用到我们上几节分析的哪几个AOP的主要实现类:ProxyFactoryBean.java , ProxyFactory.java ,AspectJProxyFactory.java ,在我们这个配置文件中,根本没有显示的去配置这些类,那么spring到底是怎么做到的呢? 大家可以这么想,spring到底是怎么去杀害目标对象的呢?真正的

SparkStreaming(源码阅读十二)

要完整去学习spark源码是一件非常不容易的事情,但是咱可以积少成多嘛~那么,Spark Streaming是怎么搞的呢? 本质上,SparkStreaming接收实时输入数据流并将它们按批次划分,然后交给Spark引擎处理生成按照批次划分的结果流: SparkStreaming提供了表示连续数据流的.高度抽象的被称为离散流的Dstream,可以使用kafka.Flume和Kiness这些数据源的输入数据流创建Dstream,也可以在其他Dstream上使用map.reduce.join.win

Spark之SQL解析(源码阅读十)

如何能更好的运用与监控sparkSQL?或许我们改更深层次的了解它深层次的原理是什么.之前总结的已经写了传统数据库与Spark的sql解析之间的差别.那么我们下来直切主题~ 如今的Spark已经支持多种多样的数据源的查询与加载,兼容了Hive,可用JDBC的方式或者ODBC来连接Spark SQL.下图为官网给出的架构.那么sparkSql呢可以重用Hive本身提供的元数据仓库(MetaStore).HiveQL.以及用户自定义函数(UDF)及序列化和反序列化的工具(SerDes). 下来我们来

jQuery 源码分析(十八) ready事件详解

ready事件是当DOM文档树加载完成后执行一个函数(不包含图片,css等),因此它的触发要早于load事件.用法: $(document).ready(fun) ;fun是一个函数,这样当DOM树加载完毕后就会执行该匿名函数了 ready有一个简写,可以直接传入$(fun)即可,这是因为在jQuey内部也定义了一个$(document)的jQuery对象,和我们在上面的写法是一样的 ready事件和window的onload区别: ready事件 ;等dom树载完毕后就可以执行 onload事

Zookeeper源码阅读(九) ZK Client-Server(2)

前言 前面一篇博客主要从大致流程的角度说了下client和server建立连接的流程,这篇和下一篇博客会详细的把上一篇不是很细致的地方展开和补充. 初始化阶段 初始化阶段主要就是把Zookeeper类中比较重要的功能类实例化,前面对这个过程说的已经比较详细了.这里主要补充几点: ClientCnxn初始化 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, w

zookeeper 源码阅读---框架

1. 启动类 QuorumPeerMain.java 集群方式:调用runFromConfig(QuorumPeerConfig config) 创建一个QuorumPeer 对象,并初始化设置其相关属性,如ZKDatabase,ServerCnxnFactory成员等. QuorumPeer继承了Thread(ublic class QuorumPeer extends Thread implements QuorumStats.Provider),所以 调用 对象的 start() 和 jo