ZK集群的Leader选举源码阅读

前言

ZooKeeper对Zab协议的实现有自己的主备模型,即Leader和learner(Observer + Follower),有如下几种情况需要进行领导者的选举工作

  • 情形1: 集群在启动的过程中,需要选举Leader
  • 情形2: 集群正常启动后,leader因故障挂掉了,需要选举Leader
  • 情形3: 集群中的Follower数量不足以通过半数检验,Leader会挂掉自己,选举新leader
  • 情景4: 集群正常运行,新增加1个Follower

本篇博文,从这四个方面进行源码的追踪阅读

程序入口

QuorumPeer.java相当于集群中的每一个节点server,在它的start()方法中,完成当前节点的启动工作,源码如下:

    // todo 进入了 QuorumPeer(意为仲裁人数)类中,可以把这个类理解成集群中的某一个点
    @Override
    public synchronized void start() {
        // todo 从磁盘中加载数据到内存中
        loadDataBase();

        // todo 启动上下文的这个工厂,他是个线程类, 接受客户端的请求
        cnxnFactory.start();

        // todo 开启leader的选举工作
        startLeaderElection();

        // todo 确定服务器的角色, 启动的就是当前类的run方法在900行
        super.start();
    }

第一个loadDataBase();目的是将数据从集群中恢复到内存中

第二个cnxnFactory.start();是当前的节点可以接受来自客户端(java代码,或者控制台)发送过来的连接请求

第三个startLeaderElection();开启leader的选举工作, 但是其实他是初始化了一系列的辅助类,用来辅助leader的选举,并非真正在选举

当前类,quorumPeer继承了ZKThread,它本身就是一个线程类,super.start();就是启动它的run方法,在他的Run方法中有一个while循环,一开始在程序启动的阶段,所有的节点的默认值都是Looking,于是会进入这个分支中,在这个分之中会进行真正的leader选举工作

小结

从程序的入口介绍中,可以看出本篇文章在会着重看下startLeaderElection();做了哪些工作? 以及在looking分支中如何选举leader

情形1:集群在启动的过程中,选举新Leader

进入startLeaderElection();方法,源码如下, 他主要做了两件事

  • 对本类QuorumPeer.java维护的变量(volatile private Vote currentVote;)初始化
  • createElectionAlgorithm()创建一个leader选举的方法

    其实到现在,就剩下一个算法没过期了,就是fastLeaderElection

   // TODO 开启投票选举Leader的工作
    synchronized public void startLeaderElection() {
        try {
            // todo 创建了一个封装了投票结果对象   包含myid 最大的zxid 第几轮Leader
            // todo 先投票给自己
            // todo 跟进它的构造函数
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        } catch(IOException e) {
            RuntimeException re = new RuntimeException(e.getMessage());
            re.setStackTrace(e.getStackTrace());
            throw re;
        }
        for (QuorumServer p : getView().values()) {
            if (p.id == myid) {
                myQuorumAddr = p.addr;
                break;
            }
        }
        if (myQuorumAddr == null) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
        }
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        // todo  创建一个领导者选举的算法,这个算法还剩下一个唯一的实现 快速选举
        this.electionAlg = createElectionAlgorithm(electionType);
    }

继续跟进createElectionAlgorithm(electionType), 在这个方法中做了如下三件大事

  • 创建了QuorumCnxnManager
  • 创建Listenner
  • 创建FastLeaderElection
protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;

    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:
        // todo 创建CnxnManager 上下文的管理器
        qcm = createCnxnManager();
        QuorumCnxManager.Listener listener = qcm.listener;
        if(listener != null){

            // todo 在这里将listener 开启
            listener.start();
            // todo  实例化领导者选举的算法
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;

准备选举环境

QuorumManager

上图是QuorumCnxManager的类图,看一下,它有6个内部类, 其中的除了Message外其他都是可以单独运行的线程类

这个类有着举足轻重的作用,它是集群中全体节点共享辅助类, 那到底有什么作用呢? 我不卖关子直接说,因为leader的选举是通过投票决议出来的,既然要相互投票,那集群中的各个点就得两两之间建立连接,这个QuorumCnxManager就负责维护集群中的各个点的通信

它维护了两种队列,源码在下面,第一个队列被存入了ConcurrentHashMap中 key就是节点的myid(或者说是serverId),值可以理解成存储它往其他服务器发送投票的队列

第二个队列是收到的其他服务器发送过来的msg

// todo key=serverId(myid)   value = 保存着当前服务器向其他服务器发送消息的队列
final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;

// todo 接收到的所有数据都在这个队列中
public final ArrayBlockingQueue<Message> recvQueue;

如上图是手绘的QuorumCnxManager.java的体系图,最直观的可以看到它内部的三条线程类,那三条线程类的run()方法又分别做了什么呢?

SendWorker的run(), 可以看到它根据sid取出了当前节点对应的队列,然后将队列中的数据往外发送


    public void run() {
            threadCnt.incrementAndGet();
            try {
                ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
                if (bq == null || isSendQueueEmpty(bq)) {
                   ByteBuffer b = lastMessageSent.get(sid);
                   if (b != null) {
                       LOG.debug("Attempting to send lastMessage to sid=" + sid);
                       send(b);
                   }
                }
            } catch (IOException e) {
                LOG.error("Failed to send last message. Shutting down thread.", e);
                this.finish();
            }

            try {
                while (running && !shutdown && sock != null) {

                    ByteBuffer b = null;
                    try {
                        // todo 取出任务所在的队列
                        ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);

                        if (bq != null) {
                            // todo 将bq,添加进sendQueue
                            b = pollSendQueue(bq, 1000, TimeUnit.MILLISECONDS);
                        } else {
                            LOG.error("No queue of incoming messages for " +
                                      "server " + sid);
                            break;
                        }

                        if(b != null){
                            lastMessageSent.put(sid, b);
                            // todo
                            send(b);
                        }
                    } catch (InterruptedException e) {
                        LOG.warn("Interrupted while waiting for message on queue",
                                e);
                    }
                }

RecvWorker的run方法,接受到了msg,然后将msg存入了recvQueue队列中

        public void run() {
            threadCnt.incrementAndGet();
            try {
                while (running && !shutdown && sock != null) {
                    /**
                     * Reads the first int to determine the length of the
                     * message
                     */
                    int length = din.readInt();
                    if (length <= 0 || length > PACKETMAXSIZE) {
                        throw new IOException(
                                "Received packet with invalid packet: "
                                        + length);
                    }
                    /**
                     * Allocates a new ByteBuffer to receive the message
                     */
                    // todo 从数组中把数据读取到数组中
                    byte[] msgArray = new byte[length];
                    din.readFully(msgArray, 0, length);
                    // todo 将数组包装成ByteBuf
                    ByteBuffer message = ByteBuffer.wrap(msgArray);
                    // todo 添加到RecvQueue中
                    addToRecvQueue(new Message(message.duplicate(), sid));
                }

]

Listenner的run(),它会使用我们在配置文件中配置的集群键通信使用的端口(如上图的3888)建立彼此之间的连接

还能发现,集群中各个点之间的通信使用的传统socket通信

        InetSocketAddress addr;
            while((!shutdown) && (numRetries < 3)){
                try {
                    // todo 创建serversocket
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                    if (listenOnAllIPs) {
                        int port = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.getPort();
                        //todo 它取出来的地址就是address就是我们在配置文件中配置集群时添加进去的 port 3888...
                        addr = new InetSocketAddress(port);
                    } else {
                        addr = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr;
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    setName(view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.toString());
                    // todo 绑定端口
                    ss.bind(addr);
                    while (!shutdown) {
                        // todo 阻塞接受其他的服务器发起连接
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                                + client.getRemoteSocketAddress());
                       // todo  如果启用了仲裁SASL身份验证,则异步接收和处理连接请求
                        // todo  这是必需的,因为sasl服务器身份验证过程可能需要几秒钟才能完成,这可能会延迟下一个对等连接请求。
                        if (quorumSaslAuthEnabled) {
                            // todo 异步接受一个连接
                            receiveConnectionAsync(client);
                        } else {
                            // todo 跟进这个方法
                            receiveConnection(client);
                        }
                        numRetries = 0;
                    }

继续跟进源码,回到QuorumPeer.javacreateElectionAlgorithm()方法中,重新截取源码如下,完成了QuorumCnxManager的创建,后进行Listener的启动, Listenner的启动标记着集群中的各个节点之间有了两两之间建立通信能力, 同时Listenner是个线程类,它的Run()方法就在上面的代码中

FastLeaderElection

启动Listenner之后, 开始实例化领导者选举的算法对象new FastLeaderElection(this, qcm)

    ...
     break;
        case 3:
            // todo 创建CnxnManager 上下文的管理器
            qcm = createCnxnManager();
   QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                // todo 在这里将listener 开启
                listener.start();
                // todo  实例化领导者选举的算法
                le = new FastLeaderElection(this, qcm);
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }

如下图是FasterElection的类图

直观的看到它三个直接内部类

  • Messager(它又有两个内部线程类)

    • WorkerRecriver

      • 负责将
    • WorkerSender
  • Notification
    • 一般是当新节点启动时状态为looking,然后发起投票决议,其他节点收到后会用Notification告诉它自己信任的leader
  • ToSend
    • 给对方发送,或者来自其他节点的消息。这些消息既可以是通知,也可以是接收通知的ack

对应着QuorumCnxManager维护的两种队列,FasterElection同样维护下面两个队列与之照应,一个是sendqueue另一个是recvqueue

LinkedBlockingQueue<ToSend> sendqueue;
LinkedBlockingQueue<Notification> recvqueue;

具体怎么玩呢? 如下图

就是当节点启动过程中对外的投票会存入FasterElectionsendqueue,然后经过QuorumCnxManagersendWorker通过NIO发送出去, 与之相反的过程,收到的其他节点的投票会被QuorumCnxManagerrecvWorker收到,然后存入QuorumCnxManagerrecvQueue中,这个队列中的msg会继续被FasterElection的内部线程类workerRecviver取出存放到FasterElectionrecvqueue中

通过追踪代码,可以发现,Message的两个内部线程都被作为守护线程的方式开启

Messenger(QuorumCnxManager manager) {
    // todo WorderSender 作为一条新的线程
    this.ws = new WorkerSender(manager);

    Thread t = new Thread(this.ws,
            "WorkerSender[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();

    //todo------------------------------------
    // todo WorkerReceiver  作为一条新的线程
    this.wr = new WorkerReceiver(manager);

    t = new Thread(this.wr,
            "WorkerReceiver[myid=" + self.getId() + "]");
    t.setDaemon(true);
    t.start();
}

小结

代码看到这里,其实选举leader的准备工作已经完成了,也就是说quorumPeer.javastart()方法中的startLeaderElection();已经准备领导选举的环境,就是上图


真正开始选举

下面就去看一下quorumPeer.java的这个线程类的启动,部分run()方法的截取,我们关心它的lookForLeader()方法

while (running) {
switch (getPeerState()) {
    /**
     * todo 四种可能的状态, 经过了leader选举之后, 不同的服务器就有不同的角色
     * todo 也就是说,不同的服务器会会走动下面不同的分支中
     * LOOKING 正在进行领导者选举
     * Observing
     * Following
     * Leading
     */
case LOOKING:
    // todo 当为Looking状态时,会进入领导者选举的阶段
    LOG.info("LOOKING");

    if (Boolean.getBoolean("readonlymode.enabled")) {
        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

        // Create read-only server but don't start it immediately
        // todo 创建了一个 只读的server但是不着急立即启动它
        final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                logFactory, this,
                new ZooKeeperServer.BasicDataTreeBuilder(),
                this.zkDb);

        // Instead of starting roZk immediately, wait some grace(优雅) period(期间) before we decide we're partitioned.
        // todo 为了立即启动roZK ,在我们决定分区之前先等一会
        // Thread is used here because otherwise it would require changes in each of election strategy classes which is
        // unnecessary code coupling.
        //todo  这里新开启一条线程,避免每一个选举策略类上有不同的改变 而造成的代码的耦合
        Thread roZkMgr = new Thread() {
            public void run() {
                try {
                    // lower-bound grace period to 2 secs
                    sleep(Math.max(2000, tickTime));
                    if (ServerState.LOOKING.equals(getPeerState())) {
                        // todo 启动上面那个只读的Server
                        roZk.startup();
                    }
                } catch (InterruptedException e) {
                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                } catch (Exception e) {
                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                }
            }
        };
        try {
            roZkMgr.start();
            setBCVote(null);
            // todo 上面的代码都不关系,直接看它的 lookForLeader()方法
            // todo 直接点进去,进入的是接口,我们看它的实现类
            setCurrentVote(makeLEStrategy().lookForLeader());
        } catch (Exception e) {
            LOG.warn("Unexpected exception",e);
            setPeerState(ServerState.LOOKING);
        } finally {
            // If the thread is in the the grace period, interrupt
            // to come out of waiting.
            roZkMgr.interrupt();
            roZk.shutdown();
        }

下面是lookForLeader()的源码解读
说实话这个方法还真的是挺长的,但是吧这个方法真的很重要,因为我们可以从这个方法中找到网络上大家针对Leader的选举总结的点点滴滴

第一点: 每次的投票都会先投自己一票,说白了new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());将自己的myid,最大的zxid,以及第几届封装起来,但是还有一个细节,就是在投自己的同时,还是会将存有自己信息的这一票通过socket发送给其他的节点

接受别人的投票是通过QuorumManagerrecvWorker线程类将投票添加进recvQueue队列中,投票给自己时,就不走这条路线了,而是选择直接将票添加进recvQueue队列中

在下面代码中存在一行HashMap<Long, Vote> recvset = new HashMap<Long, Vote>(); 这个map可以理解成一个小信箱,每一个节点都会维护一个信箱,这里面可能存放着自己投给自己的票,或者别人投给自己的票,或者别人投给别人的票,或者自己投给别人的票,通过统计这个信箱中的票数可以决定某一个节点是否可以成为leader,源码如下, 使用信箱中的信息,

    // todo 根据别人的投票,以及自己的投票判断,本轮得到投票的集群能不能成为leader
    if (termPredicate(recvset,
            new Vote(proposedLeader, proposedZxid,
                    logicalclock.get(), proposedEpoch))) {
        // todo 到这里说明接收到投票的机器已经是准leader了

        // Verify if there is any change in the proposed leader
        // todo 校验一下, leader有没有变动
        while ((n = recvqueue.poll(finalizeWait,
                TimeUnit.MILLISECONDS)) != null) {
            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                    proposedLeader, proposedZxid, proposedEpoch)) {
                recvqueue.put(n);
                break;
            }
        }
        if (n == null) {
                // todo 判断自己是不是leader, 如果是,更改自己的状态未leading , 否则根据配置文件确定状态是 Observer 还是Follower
                // todo leader选举出来后, QuorumPeer中的run方法中的while再循环,不同角色的服务器就会进入到 不同的分支
                self.setPeerState((proposedLeader == self.getId()) ?
                        ServerState.LEADING : learningState());

                Vote endVote = new Vote(proposedLeader,
                        proposedZxid,
                        logicalclock.get(),
                        proposedEpoch);
                leaveInstance(endVote);
                return endVote;
            }
        }

termPredicate()函数中有如下的逻辑,self.getQuorumVerifier().containsQuorum(set);它的实现如下,实际上就是在进行过半机制的检验,结论就是当某个节点拥有了集群中一半以上的节点的投票时,它就会把自己的状态修改成leading, 其他的节点根据自己的需求将状态该变成following或者observing

    public boolean containsQuorum(Set<Long> set){
        return (set.size() > half);
    }

维护着一个时钟,标记这是第几次投票了logicalclock他是AutomicLong类型的变量,他有什么用呢? 通过下面的代码可以看到如下的逻辑,就是当自己的时钟比当前接收到投票的时钟小时,说明自己可能因为其他原因错过了某次投票,所以更新自己的时钟,重新判断投自己还是投别人, 同理,如果接收到的投票的时钟小于自己当前的时钟,说明这个票是没有意义的,直接丢弃不理会

   if (n.electionEpoch > logicalclock.get()) {
                                // todo 将自己的时钟调整为更新的时间
                                logicalclock.set(n.electionEpoch);
                                // todo 清空自己的投票箱
                                recvset.clear();

那么根据什么判断是投给自己还是投给别人呢? 通过解析出票的封装类中封装的节点的信息,什么信息呢?zxid,myid,epoch 通常情况是epoch大的优先成为leader,一般来说epoch都会相同,所以zxid大的优先成为leader,如果zxid再相同,则myid大的优先成为leader

检查到别的节点比自己更适合当leader,会重新投票,选举更适合的节点

完整的源码

// todo 当前进入的是FastLeaderElection.java的实现类
public Vote lookForLeader() throws InterruptedException {
try {
    // todo 创建用来选举Leader的Bean
    self.jmxLeaderElectionBean = new LeaderElectionBean();

    MBeanRegistry.getInstance().register(
            self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
    LOG.warn("Failed to register with JMX", e);
    self.jmxLeaderElectionBean = null;
}
if (self.start_fle == 0) {
    self.start_fle = Time.currentElapsedTime();
}
try {
    // todo 每台服务器独有的投票箱 , 存放其他服务器投过来的票的map
    // todo long类型的key (sid)标记谁给当前的server投的票   Vote类型的value 投的票
    HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

    HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

    int notTimeout = finalizeWait;

    synchronized (this) {
        //todo Automic 类型的时钟
        logicalclock.incrementAndGet();
        //todo 一开始启动时,入参位置的值都取自己的,相当于投票给自己
        updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
    }

    LOG.info("New election. My id =  " + self.getId() +
            ", proposed zxid=0x" + Long.toHexString(proposedZxid));
    // todo 发送出去,投票自己
    sendNotifications();

    /*
     * Loop in which we exchange notifications until we find a leader
     */
    // todo 如果自己一直处于LOOKING的状态,一直循环
    while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
        /*
         * Remove next notification from queue, times out after 2 times
         * the termination time
         */
        //todo  尝试获取其他服务器的投票的信息

        // todo 从接受消息的队列中取出一个msg(这个队列中的数据就是它投票给自己的票)
        // todo 在QuorumCxnManager.java中 发送的投票的逻辑中,如果是发送给自己的,就直接加到recvQueue,而不经过socket
        // todo 所以它在这里是取出了自己的投票
        Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

        /*
         * Sends more notifications if haven't received enough.
         * Otherwise processes new notification.
         */
        // todo 第一轮投票这里不为空
        if (n == null) {
            // todo 第二轮就没有投票了,为null, 进入这个分支
            // todo 进行判断 ,如果集群中有三台服务器,现在仅仅启动一台服务器,还剩下两台服务器没启动
            // todo 那就会有3票, 其中1票直接放到 recvQueue , 另外两票需要发送给其他两台机器的逻辑就在这里判断
            // todo 验证是通不过的,因为queueSendMap中的两条队列都不为空
            if (manager.haveDelivered()) {
                sendNotifications();
            } else {
                // todo 进入这个逻辑
                manager.connectAll();
            }

            /*
             * Exponential backoff
             */
            int tmpTimeOut = notTimeout * 2;
            notTimeout = (tmpTimeOut < maxNotificationInterval ?
                    tmpTimeOut : maxNotificationInterval);
            LOG.info("Notification time out: " + notTimeout);
        } else if (validVoter(n.sid) && validVoter(n.leader)) {
            // todo 收到了其他服务器的投票信息后,来到下面的分支中处理
            /*
             * Only proceed if the vote comes from a replica in the
             * voting view for a replica in the voting view.
             * todo 仅当投票来自投票视图中的副本时,才能继续进行投票。
             */
            switch (n.state) {
                case LOOKING:
                    // todo 表示获取到投票的服务器的状态也是looking

                    // If notification > current, replace and send messages out
                    // todo 对比接收到的头片的 epoch和当前时钟先后

                    // todo 接收到的投票 > 当前服务器的时钟
                    // todo 表示当前server在投票过程中可能以为故障比其他机器少投了几次,需要重新投票
                    if (n.electionEpoch > logicalclock.get()) {
                        // todo 将自己的时钟调整为更新的时间
                        logicalclock.set(n.electionEpoch);
                        // todo 清空自己的投票箱
                        recvset.clear();
                        // todo 用别人的信息和自己的信息对比,选出一个更适合当leader的,如果还是自己适合,不作为, 对方适合,修改投票,投 对方
                        if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        sendNotifications();

                        // todo 接收到的投票 < 当前服务器的时钟
                        // todo 说明这个投票已经不能再用了
                    } else if (n.electionEpoch < logicalclock.get()) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                    + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                        // todo 别人的投票时钟和我的时钟是相同的
                        // todo 满足 totalOrderPredicate 后,会更改当前的投票,重新投票
                        /**
                         *   在 totalOrderPredicate 比较两者之间谁更满足条件
                         *   ((newEpoch > curEpoch) ||
                         *   ((newEpoch == curEpoch) &&
                         *   ((newZxid > curZxid) ||
                         *   ((newZxid == curZxid) &&
                         *   (newId > curId)))));
                         */
                        // todo 返回true说明 对方更适合当leader
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Adding vote: from=" + n.sid +
                                ", proposed leader=" + n.leader +
                                ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }
                    // todo 将自己的投票存放到投票箱子中
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    // todo 根据别人的投票,以及自己的投票判断,本轮得到投票的集群能不能成为leader
                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {
                        // todo 到这里说明接收到投票的机器已经是准leader了

                        // Verify if there is any change in the proposed leader
                        // todo 校验一下, leader有没有变动
                        while ((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null) {
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {
                            // todo 判断自己是不是leader, 如果是,更改自己的状态未leading , 否则根据配置文件确定状态是 Observer 还是Follower
                            // todo leader选举出来后, QuorumPeer中的run方法中的while再循环,不同角色的服务器就会进入到 不同的分支
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING : learningState());

                            Vote endVote = new Vote(proposedLeader,
                                    proposedZxid,
                                    logicalclock.get(),
                                    proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    // todo 禁止Observer参加投票
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if (n.electionEpoch == logicalclock.get()) {
                        recvset.put(n.sid, new Vote(n.leader,
                                n.zxid,
                                n.electionEpoch,
                                n.peerEpoch));

                        if (ooePredicate(recvset, outofelection, n)) {
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING : learningState());

                            Vote endVote = new Vote(n.leader,
                                    n.zxid,
                                    n.electionEpoch,
                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify
                     * a majority is following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version,
                            n.leader,
                            n.zxid,
                            n.electionEpoch,
                            n.peerEpoch,
                            n.state));

                    if (ooePredicate(outofelection, outofelection, n)) {
                        synchronized (this) {
                            logicalclock.set(n.electionEpoch);
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING : learningState());
                        }
                        Vote endVote = new Vote(n.leader,
                                n.zxid,
                                n.electionEpoch,
                                n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                            n.state, n.sid);
                    break;
            }
        } else {
            if (!validVoter(n.leader)) {
                LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
            }
            if (!validVoter(n.sid)) {
                LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
            }
        }
    }
    return null;

经过如上的判断各个节点的就可以选举出不同的角色,再次回到QuorumPeer.javarun()中进行循环时,不再会进入case LOOKING:代码块了,而是按照自己不同的角色各司其职,完成不同的初始化启动

情形2: 集群正常启动后,leader因故障挂掉了,选举新Leader

第二种选举leader的情况,集群正常启动后,leader因故障挂掉了,选举新Leader

这部分的逻辑是怎样的呢?

leader虽然挂了,但是角色为Follower的server依然会去执行QuorumPeer.javarun()方法中的无限while循环,当它执行follower.followLeader();方法时找不到leader,就会出异常,最终执行finally代码块中的逻辑,可以看到它修改了自己的状态为looking,进而重新选举leader

   break;
        case FOLLOWING:
            // todo server 当选follow角色
            try {
                LOG.info("FOLLOWING");
                setFollower(makeFollower(logFactory));
                follower.followLeader();
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e);
            } finally {
                follower.shutdown();
                setFollower(null);
                setPeerState(ServerState.LOOKING);
            }
            break;

情形3: 集群中的Follower数量不足以通过半数检验,Leader会挂掉自己,然后选举新leader

情形3: 假设集群中2台Follower,1台leader,那么当挂掉一台Follower时,剩下1台Follower无法满足过半检查机制因此会重新选举leader

回到源码:leader每次都进入case LEADING:去执行leader.lead();

 case LEADING:
            // todo 服务器成功当选成leader
            LOG.info("LEADING");
            try {
                setLeader(makeLeader(logFactory));
                // todo 跟进lead
                leader.lead();
                setLeader(null);
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e);
            } finally {
                if (leader != null) {
                    leader.shutdown("Forcing shutdown");
                    setLeader(null);
                }
                setPeerState(ServerState.LOOKING);
            }
            break;

但是在leader.lead();中每次执行都会进行如下的判断,很明显,当不满足半数检验时,leader直接挂掉自己,最终将集群中所有节点的状态改成LOOKING,重新选举


              if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
                //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
                    // Lost quorum, shutdown
                    shutdown("Not sufficient followers synced, only synced with sids: [ "
                            + getSidSetString(syncedSet) + " ]");
                    // make sure the order is the same!
                    // the leader goes to looking
                    return;
              } 

情景4: 集群正常运行,新增加1个Follower

新增加的进来的Follower在启动时它的状态是looking, 同样她也会去尝试选举leader,同样会把第一票投给自己,但是对于一个稳定的集群来说
集群中的各个橘色已经确定下来了,在这种情况下,会进入FastLeaderElection.javalookForLeader()方法的如下分支,使当前新添加进来的节点
直接认Leader

case OBSERVING:
        // todo 禁止Observer参加投票
        LOG.debug("Notification from observer: " + n.sid);
        break;
    case FOLLOWING:
    case LEADING:
        /*
            * Consider all notifications from the same epoch
            * together.
            */
        if (n.electionEpoch == logicalclock.get()) {
            recvset.put(n.sid, new Vote(n.leader,
                    n.zxid,
                    n.electionEpoch,
                    n.peerEpoch));

            if (ooePredicate(recvset, outofelection, n)) {
                self.setPeerState((n.leader == self.getId()) ?
                        ServerState.LEADING : learningState());

                Vote endVote = new Vote(n.leader,
                        n.zxid,
                        n.electionEpoch,
                        n.peerEpoch);
                leaveInstance(endVote);
                return endVote;
            }
        }

如果有错误欢迎指出,如果对您有帮助,欢迎点支持

原文地址:https://www.cnblogs.com/ZhuChangwu/p/11622763.html

时间: 2024-10-08 05:03:54

ZK集群的Leader选举源码阅读的相关文章

ZK集群如何保证数据一致性源码阅读

什么是数据一致性? 只有当服务端的ZK存在多台时,才会出现数据一致性的问题, 服务端存在多台服务器,他们被划分成了不同的角色,只有一台Leader,多台Follower和多台Observer, 他们中的任意一台都能响应客户端的读请求,任意一台也都能接收写请求, 不同的是,Follower和Observer接收到客户端的写请求后不能直接处理这个请求而是将这个请求转发给Leader,由Leader发起原子广播完成数据一致性 理论上ZK集群中的每一个节点的作用都是相同的,他们应该和单机时一样,各个节点

面试题:说说你对ZooKeeper集群与Leader选举的理解?

ZooKeeper是一个开源分布式协调服务.分布式数据一致性解决方案.可基于ZooKeeper实现命名服务.集群管理.Master选举.分布式锁等功能. 高可用 为了保证ZooKeeper的可用性,在生产环境中我们使用ZooKeeper集群模式对外提供服务,并且集群规模至少由3个ZooKeeper节点组成. ? 集群至少由3个节点组成?? ZooKeeper其实2个节点也可以组成集群并对外提供服务,但我们使用集群主要目的是为了高可用.如果2个节点组成集群,其中1个节点挂了,另外ZooKeeper

使用sqlserver搭建高可用双机热备的Quartz集群部署【附源码】

一般拿Timer和Quartz相比较的,简直就是对Quartz的侮辱,两者的功能根本就不在一个层级上,如本篇介绍的Quartz强大的集群机制,可以采用基于 sqlserver,mysql的集群方案,当然还可以在第三方插件的基础上实现quartz序列化到热炒的mongodb,redis,震撼力可想而知,接下来本篇就和大家聊 一聊怎么搭建基于sqlserver的quartz集群,实现这么一种双机热备的强大功能. 一:下载sqlserver版的建表脚本   首先大家可以通过github上搜索quart

ZK集群中通过Processor保证数据一致性源码阅读

入口 书接上篇博客中的ZK集群启动后完成数据的统一性恢复后,来到启动ZkServer的逻辑,接下来的重点工作就是启动不同角色的对应的不同的处理器Processor 如上图查看ZooKeeperServer的继承图,三种不同的角色有不同的ZooKeeperServer的实现逻辑类 三者启动时,都将会来到ZooKeeperServer.java中的startUp()方法中,源码如下,但是,不同的角色针对setupRequestProcessors();进行了不同的重写,所以本篇博客的重点即使看一下他

Redis集群搭建及选举原理

redis集群简述 哨兵模式中如果主从中master宕机了,是通过哨兵来选举出新的master,在这个选举切换主从的过程,整个redis服务是不可用的.而且哨兵模式中只有一个主节点对外提供服务,因此没法支持更高的并发.而且当个主节点的内存设置也不宜过大.否则会导致持久化文件过大,影响数据恢复或主从同步的效率. redis集群是由一系列的主从节点群组成的分布式服务器群,它具有复制.高可用和分片特性.Redis集群不需要 sentinel哨兵也能完成节点移除和故障转移的功能.需要将每个节点设置成集群

Redis集群节点的选举(实验)

Redis集群节点的选举: 当master挂掉之后,就会在该集群中的slave中选取一个来代替mater角色, 从而保证redis集群slot的完整性. 如果其中一个mster和它的slave都挂掉后,会导致slot不完整,整个集群都会挂掉. 集群节点信息: 192.168.2.200:6379> cluster nodes 3ff3a74f9dc41f8bc635ab845ad76bf77ffb0f69 192.168.2.201:6379 master - 0 1527145806504 5

zookeeper:springboot+dubbo配置zk集群并测试

1.springboot配置zk集群 1.1:非主从配置方法 dubbo: registry: protocol: zookeeper address: 12.1.1.69:2181,12.1.1.85:2181,12.1.1.84:2181 check: false 1.2:主从配置方法 dubbo: registry: protocol: zookeeper address: 12.1.1.69:2181?backup=12.1.1.85:2181,12.1.1.84:2181 check:

ZooKeeper单机服务端的启动源码阅读

程序的入口QuorumPeerMain public static void main(String[] args) { // QuorumPeerMain main = new QuorumPeerMain(); try { // 初始化服务端,并运行服务端 // todo 跟进去看他如何处理 服务端的配置文件,以及根据服务端的配置文件做出来那些动作 main.initializeAndRun(args); 点击查看上图原文地址( zhaoyu_nb) 初始化和启动总览 跟进initializ

Flume-NG源码阅读之HBaseSink

关于HBase的sink的所有内容均在org.apache.flume.sink.hbase包下. 每个sink包括自己定制的,都extends AbstractSink implements Configurable. 一.首先是configure(Context context)方法.该方法是对HBaseSink的参数初始化.主要包括以下几个: tableName:要写入的HBase数据表名,不能为空: columnFamily:数据表对应的列簇名,这个sink目前只支持一个列簇,不能为空: