zookeeper源码分析之一客户端发送请求流程

  znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等。

知识准备:

zookeeper定义的状态有:

Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),SaslAuthenticated(6),Expired (-112);

事件定义的的类型有:None (-1),NodeCreated (1),NodeDeleted (2),NodeDataChanged (3),NodeChildrenChanged (4),DataWatchRemoved (5),ChildWatchRemoved (6);

watcher定义的的类型有Children(1), Data(2), Any(3);

在上一篇

zookeeper源码分析之一客户端

中,我们连接zookeeper时,启动了一个MyWatcher

protected void connectToZK(String newHost) throws InterruptedException, IOException {
        if (zk != null && zk.getState().isAlive()) {
            zk.close();
        }
        host = newHost;
        boolean readOnly = cl.getOption("readonly") != null;
        if (cl.getOption("secure") != null) {
            System.setProperty(ZooKeeper.SECURE_CLIENT, "true");
            System.out.println("Secure connection is enabled");
        }
        zk = new ZooKeeper(host,
                 Integer.parseInt(cl.getOption("timeout")),
                 new MyWatcher(), readOnly);
    }

创建zookeeper示例时,使用到watchManager:

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly, HostProvider aHostProvider)
            throws IOException {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;

        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        hostProvider = aHostProvider;

        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }

将传进来的MyWatcher作为默认watcher,存入watchManager,然后通过ClientCnxn包装后,启动线程。

  那我们先了解一下ClientCnxn吧,ClientCnxn管理客户端socket的io,它维护了一组可以连接上的server及当需要转换时可以透明的转换到的一组server。

先了解一下如何获取socket的吧:

    private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
        String clientCnxnSocketName = System
                .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
        if (clientCnxnSocketName == null) {
            clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
        }
        try {
            return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
                    .newInstance();
        } catch (Exception e) {
            IOException ioe = new IOException("Couldn‘t instantiate "
                    + clientCnxnSocketName);
            ioe.initCause(e);
            throw ioe;
        }
    }

  接着启动ClientCnxn的start()方法,在此方法中启动了两个线程:

    public void start() {
        sendThread.start();
        eventThread.start();
    }

其中SendThread类为发送的请求队列提供服务,并且产生心跳。它同时也产生ReadThread。

我们看一下SendThread的run方法的主体:

                    if (!clientCnxnSocket.isConnected()) {
                        // don‘t re-establish connection if we are closing
                        if (closing) {
                            break;
                        }
                        startConnect();
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

                    if (state.isConnected()) {
                        // determine whether we need to send an AuthFailed event.
                        if (zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                try {
                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException e) {
                                   LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                }
                            }
                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
                            if (authState != null) {
                                if (authState == KeeperState.AuthFailed) {
                                    // An authentication error occurred during authentication with the Zookeeper Server.
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                } else {
                                    if (authState == KeeperState.SaslAuthenticated) {
                                        sendAuthEvent = true;
                                    }
                                }
                            }

                            if (sendAuthEvent == true) {
                                eventThread.queueEvent(new WatchedEvent(
                                      Watcher.Event.EventType.None,
                                      authState,null));
                            }
                        }
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }

                    if (to <= 0) {
                        String warnInfo;
                        warnInfo = "Client session timed out, have not heard from server in "
                            + clientCnxnSocket.getIdleRecv()
                            + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId);
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }
                    if (state.isConnected()) {
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small
                        int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() -
                                ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }

                    // If we are in read-only mode, seek for read/write server
                    if (state == States.CONNECTEDREADONLY) {
                        long now = Time.currentElapsedTime();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout =
                                Math.min(2*pingRwTimeout, maxPingRwTimeout);
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }

                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                

ClientCnxnSocketNetty实现了ClientCnxnSocket的抽象方法,它负责连接到server,读取/写入网络流量,并作为网络数据层和更高packet层的中间层。其生命周期如下:

     loop:
     - try:
     - - !isConnected()
     - - - connect()
     - - doTransport()
     - catch:
     - - cleanup()
     close()

从上述描述中,我们可以看到ClientCnxnSocket的工作流程,先判断是否连接,没有连接则调用connect方法进行连接,有连接则直接使用;然后调用doTransport方法进行通信,若连接过程中出现异常,则调用cleanup()方法;最后关闭连接。故最主要的流程为doTransport()方法:

 @Override
    void doTransport(int waitTimeOut,
                     List<Packet> pendingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
        try {
            if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {
                return;
            }
            Packet head = null;
            if (needSasl.get()) {
                if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } else {
                if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
                    return;
                }
            }
            // check if being waken up on closing.
            if (!sendThread.getZkState().isAlive()) {
                // adding back the patck to notify of failure in conLossPacket().
                addBack(head);
                return;
            }
            // channel disconnection happened
            if (disconnected.get()) {
                addBack(head);
                throw new EndOfStreamException("channel for sessionid 0x"
                        + Long.toHexString(sessionId)
                        + " is lost");
            }
            if (head != null) {
                doWrite(pendingQueue, head, cnxn);
            }
        } finally {
            updateNow();
        }
    }

我们简化一下上面的程序,一个是异常处理addBack(head),另一个正常流程处理doWrite(pendingQueue, head, cnxn),我们先抛掉异常,走正常流程看看:

先获取Packet:

Packet head = null;
            if (needSasl.get()) {
                if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {
                    return;
                }
            } else {
                if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {
                    return;
                }
            }

其中,protected LinkedBlockingDeque<Packet> outgoingQueue是一个链表阻塞队列,保存发出的请求;

然后执行doWrite方法:

 /**
     * doWrite handles writing the packets from outgoingQueue via network to server.
     */
    private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {
        updateNow();
        while (true) {
            if (p != WakeupPacket.getInstance()) {
                if ((p.requestHeader != null) &&
                        (p.requestHeader.getType() != ZooDefs.OpCode.ping) &&
                        (p.requestHeader.getType() != ZooDefs.OpCode.auth)) {
                    p.requestHeader.setXid(cnxn.getXid());
                    synchronized (pendingQueue) {
                        pendingQueue.add(p);
                    }
                }
                sendPkt(p);
            }
            if (outgoingQueue.isEmpty()) {
                break;
            }
            p = outgoingQueue.remove();
        }
    }

dowrite方法负责将outgoingQueue的报文通过网络写到服务器上。发送报文程序如上红色所示:

    private void sendPkt(Packet p) {
        // Assuming the packet will be sent out successfully. Because if it fails,
        // the channel will close and clean up queues.
        p.createBB();
        updateLastSend();
        sentCount++;
        channel.write(ChannelBuffers.wrappedBuffer(p.bb));
    }

1. Packet报文的结构如下:

 /**
     * This class allows us to pass the headers and the relevant records around.
     */
    static class Packet {
        RequestHeader requestHeader;

        ReplyHeader replyHeader;

        Record request;

        Record response;

        ByteBuffer bb;

        /** Client‘s view of the path (may differ due to chroot) **/
        String clientPath;
        /** Servers‘s view of the path (may differ due to chroot) **/
        String serverPath;

        boolean finished;

        AsyncCallback cb;

        Object ctx;

        WatchRegistration watchRegistration;

        public boolean readOnly;

        WatchDeregistration watchDeregistration;

        /** Convenience ctor */
        Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
               Record request, Record response,
               WatchRegistration watchRegistration) {
            this(requestHeader, replyHeader, request, response,
                 watchRegistration, false);
        }

        Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
               Record request, Record response,
               WatchRegistration watchRegistration, boolean readOnly) {

            this.requestHeader = requestHeader;
            this.replyHeader = replyHeader;
            this.request = request;
            this.response = response;
            this.readOnly = readOnly;
            this.watchRegistration = watchRegistration;
        }

        public void createBB() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeInt(-1, "len"); // We‘ll fill this in later
                if (requestHeader != null) {
                    requestHeader.serialize(boa, "header");
                }
                if (request instanceof ConnectRequest) {
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                    request.serialize(boa, "request");
                }
                baos.close();
                this.bb = ByteBuffer.wrap(baos.toByteArray());
                this.bb.putInt(this.bb.capacity() - 4);
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

        @Override
        public String toString() {
            StringBuilder sb = new StringBuilder();

            sb.append("clientPath:" + clientPath);
            sb.append(" serverPath:" + serverPath);
            sb.append(" finished:" + finished);

            sb.append(" header:: " + requestHeader);
            sb.append(" replyHeader:: " + replyHeader);
            sb.append(" request:: " + request);
            sb.append(" response:: " + response);

            // jute toString is horrible, remove unnecessary newlines
            return sb.toString().replaceAll("\r*\n+", " ");
        }
    }

从createBB方法中,我们看到在底层实际的网络传输序列化中,zookeeper只会讲requestHeader和request两个属性进行序列化,即只有这两个会被序列化到底层字节数组中去进行网络传输,不会将watchRegistration相关的信息进行网络传输。

2. 更新最后一次发送updateLastSend

    void updateLastSend() {
        this.lastSend = now;
    }

3. 使用nio channel 发送字节缓存到server

channel.write(ChannelBuffers.wrappedBuffer(p.bb));

其中,bb的类型为ByteBuffer,在packet中进行了初始化。

                this.bb = ByteBuffer.wrap(baos.toByteArray());
                this.bb.putInt(this.bb.capacity() - 4);
                this.bb.rewind();

小结:

zookeeper客户端和服务器的连接主要是通过ClientCnxnSocket来实现的,有两个具体的实现类ClientCnxnSocketNetty和ClientCnxnSocketNIO,其工作流程如下:

  先判断是否连接,没有连接则调用connect方法进行连接,有连接则进入下一步;

  然后调用doTransport方法进行通信,若连接过程中出现异常,则调用cleanup()方法;

  最后关闭连接。

上述的发现可以在SendThread的run方法中体现。

另:Zookeeper的特性--》顺序一致性:按照客户端发送请求的顺序更新数据。我们再sendThread里可以看到多次更新时间戳来保证顺序一致性,如下:

时间: 2024-10-21 20:52:32

zookeeper源码分析之一客户端发送请求流程的相关文章

Netty源码分析-- 处理客户端接入请求(八)

这一节我们来一起看下,一个客户端接入进来是什么情况.首先我们根据之前的分析,先启动服务端,然后打一个断点. 这个断点打在哪里呢?就是NioEventLoop上的select方法上. 然后我们启动一个客户端. 然后我们debug看到,selectedKey的数量 = 1,说明有accept或者读写等事件发生. 接下就会进 processSelectedKeys() 我们上一节讲到,这里的attach就是NioServerSocketChannel, 我们进入 processSelectedKey(

zookeeper源码分析之五服务端(集群leader)处理请求流程

leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor 具体情况可以参看代码: @Override protected v

zookeeper源码分析之一服务端处理请求流程

上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析各自一下消息处理过程: 前文可以看到在 1.在单机情况下NettyServerCnxnFactory中启动ZookeeperServer来处理消息: public synchronized void startup() { if (sessionTracker == null) { createSe

Zookeeper 源码分析-启动

Zookeeper 源码分析-启动 博客分类: Zookeeper 本文主要介绍了zookeeper启动的过程 运行zkServer.sh start命令可以启动zookeeper.入口的main函数在类中QuorumPeerMain. main函数主要调用了runFromConfig函数,创建了QuorumPeer对象,并且调用了start函数,从而启动了zookeeper. Java代码   public class QuorumPeerMain { protected QuorumPeer

angularjs源码分析之:angularjs执行流程

angularjs用了快一个月了,最难的不是代码本身,而是学会怎么用angular的思路思考问题.其中涉及到很多概念,比如:directive,controller,service,compile,link,scope,isolate scope,双向绑定,mvvm等.最近准备把这些都慢慢搞懂,分析源码并贴到博客园,如有分析不对的地方,还望各位包容并指正. angularjs源码分析之:angularjs执行流程 先上个大图,有个大概印象,注:angularjs的版本为:1.2.1,通过bowe

LevelDB源码分析--Cache及Get查找流程

本打算接下来分析version相关的概念,但是在准备的过程中看到了VersionSet的table_cache_这个变量才想起还有这样一个模块尚未分析,经过权衡觉得leveldb的version相对Cache来说相对复杂,而且version虽然对整个leveldb来说实现上跟其他功能十分紧密,但是从概念上来说却相对弱很多,有点感觉是附加的功能的感觉.所以从介绍系统首先应该注意的是整个系统概念的完整性的角度说还是先分析Cache相关的功能. 我们先来看Cache的基本框架结构数据: struct

storm操作zookeeper源码分析-cluster.clj

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中).backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState.clojure中的protocol可以看成java中的接口,封装了一组方法.ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协

源码分析 Kafka 消息发送流程(文末附流程图)

温馨提示:本文基于 Kafka 2.2.1 版本.本文主要是以源码的手段一步一步探究消息发送流程,如果对源码不感兴趣,可以直接跳到文末查看消息发送流程图与消息发送本地缓存存储结构. 从上文 初识 Kafka Producer 生产者,可以通过 KafkaProducer 的 send 方法发送消息,send 方法的声明如下: Future<RecordMetadata> send(ProducerRecord<K, V> record) Future<RecordMetada

asp.net mvc源码分析-ModelValidatorProviders 客户端的验证

几年写过asp.net mvc源码分析-ModelValidatorProviders 当时主要是考虑mvc的流程对,客户端的验证也只是简单的提及了一下,现在我们来仔细看一下客户端的验证. 如图所示, 首先我们要知道这里的data-val这些属性是在哪里生成的?可以肯定是在mvc后台生成的, @Html.PasswordFor(m => m.Password) 生成input @Html.ValidationMessageFor(m => m.Password) 生成span 调用层级关系: