zookeeper客户端的主类入口是Zookeeper类,负责与zookeeper server端的通信以及触发watcher等。
下文主要分析zookeeper客户端的工作流程。
1 zookeeper构造函数
主要分两类,一是不带sessionid的,这是客户端第一次连接server时采用
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); watchManager.defaultWatcher = watcher; ConnectStringParser connectStringParser = new ConnectStringParser( connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }
另一类是带有sessionid的,这主要是当发生connect掉线时,客户端发起连接用的,这些连接对用户是透明的,基本过程是相似的。
2 Zookeeper主类包含的主要成员
主要包含一个ClientCnxn用来管理与服务器的网络通信,一个watchManager用来管理watcher;
此外定义了内部类 ZKWatchManager 和 WatchRegistration,后者用来注册watcher到相应的watchset。
以及供用户使用的API create exists setData getData 和getChildren以及相应的异步方法等。
zookeeper的启动是cnxn.start()。
3 ClientCnxn类
此类的启动代码如下:
1 public void start() { 2 sendThread.start(); 3 eventThread.start(); 4 }
可以看到,这个类开启了两个线程,一个是SendThread、一个是EventThread。
其中SendThread负责发送 数据请求/ping等信息给服务端以及负责读取服务端发送回来的信息。
EventThread负责出来服务端发来的事件信息。
详细得说,SendThrad的运行代码是:
1 while(state.isAlive()){ 2 如果没有连接:发起连接 -- 此处会调用 clientCnxnSocket的connect方法3 如果是连接状态,处理sasl事情 4 使用to表示客户端距离timeout还剩多少时间,准备发起ping连接5 调用clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); 6 }
注意到,最后他会调用doTransport方法 。其中 pendingQueue是一个用来存放已经发送、等待回应的Packet队列,outgoingQueue是一个存放等待发送的Packet的队列。
connect方法会做些什么呢?
a 首先会创建一个sock
b 将sock注册到selector,如果立刻连接上了,调用 sendThread.primeConnection()
primeConnection 主要是用来注册zookeeper之前所有的watcher,以及认证信息。
接下来看下doTransport方法会干些什么:
synchronized (this) { selected = selector.selectedKeys();}
它会选择已经就绪的事情,如果是connect成功时间,调用primeConnection;
如果是读写事件,调用doIO(pendingQueue, outgoingQueue, cnxn)
此外,如果发现outgoingQueue里 findSendablePacket,还需要 enableWrite
接下来,我们看下doIo都会做些什么:
如果是读取事件,那么一次会从incomingBuffer里读取rc、长度或者 信息 或者是因为没有初始化。
a 如果读取的是长度信息,那么 incomingBuffer = ByteBuffer.allocate(len);
b 如果不是长度信息,而且没有初始化,那么进行初始化,初始化会干这么几件事情:
一 读取connect结果,这会做下面几件事:
a1 从 incoming读取连接信息,初始化一些变量
1 readTimeout = negotiatedSessionTimeout * 2 / 3; 2 connectTimeout = negotiatedSessionTimeout / hostProvider.size();
b1 发送一个连接成功的事件
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,eventState, null));
二 enableRead ,如果findSendablePacket 则 enableWrite
三 lenBuffer.clear(), incomingBuff = lenBuffer
四 设置初始化变量为true
c 使用sendThread读取信息
1 sendThread.readResponse(incomingBuffer); 2 lenBuffer.clear(); 3 incomingBuffer = lenBuffer; 4 updateLastHeard();
接下来,我们观察 readResponse会做些什么:
首先读取header,如果其xid == -2,表明是一个ping的response,return
如果xid是 -4 ,表明是一个AuthPacket的response return
如果xid是 -1,表明是一个notification,此时要继续读取并构造一个enent,通过EventThread.queueEvent发送,return
其它情况下:
从pendingQueue拿出一个Packet,校验后更新packet信息:
1 try { 2 if (packet.requestHeader.getXid() != replyHdr.getXid()) { 3 packet.replyHeader.setErr( 4 KeeperException.Code.CONNECTIONLOSS.intValue()); 5 throw new IOException("Xid out of order. Got Xid " 6 + replyHdr.getXid() + " with err " + 7 + replyHdr.getErr() + 8 " expected Xid " 9 + packet.requestHeader.getXid() 10 + " for a packet with details: " 11 + packet ); 12 } 13 14 packet.replyHeader.setXid(replyHdr.getXid()); 15 packet.replyHeader.setErr(replyHdr.getErr()); 16 packet.replyHeader.setZxid(replyHdr.getZxid()); 17 if (replyHdr.getZxid() > 0) { 18 lastZxid = replyHdr.getZxid(); 19 } 20 if (packet.response != null && replyHdr.getErr() == 0) { 21 packet.response.deserialize(bbia, "response"); 22 } 23 24 if (LOG.isDebugEnabled()) { 25 LOG.debug("Reading reply sessionid:0x" 26 + Long.toHexString(sessionId) + ", packet:: " + packet); 27 } 28 } finally { 29 finishPacket(packet); 30 }
最后他会调用 finishPacket方法,此方法主要会做下面的事情:
1 private void finishPacket(Packet p) { 2 if (p.watchRegistration != null) { 3 p.watchRegistration.register(p.replyHeader.getErr()); 4 } 5 6 if (p.cb == null) { 7 synchronized (p) { 8 p.finished = true; 9 p.notifyAll(); -- 主要是用来通知同步调用的API方法 10 } 11 } else { -- 处理异步调用 12 p.finished = true; 13 eventThread.queuePacket(p); 14 } 15 }
至此,读事件就结束了。
如果是一个写事件,接下来会做下面的事情:
a 锁定outgoingQueue,发现一个可以发送的packet,调用createBB创建packet的bb,使用sock.write写入,并且将此packet从outgoing对列移动到pending队列。
b 做一些开关控制
1 if (outgoingQueue.isEmpty()) { 7 disableWrite(); 8 } else if (!initialized && p != null && !p.bb.hasRemaining()) { 18 disableWrite(); 19 } else { 20 // Just in case 21 enableWrite(); 22 }
写事件就此结束。
可以看到sendThread中调用了eventThread的queuePacket和queueEvent方法,下面就分析一下EventThread:
此类中主要的数据结构是
private final LinkedBlockingQueue<Object> waitingEvents = new LinkedBlockingQueue<Object>();
首先,看下其run方法:
1 public void run() { 2 try { 3 isRunning = true; 4 while (true) { 5 Object event = waitingEvents.take(); 6 if (event == eventOfDeath) { 7 wasKilled = true; 8 } else { 9 processEvent(event); 10 } 11 if (wasKilled) 12 synchronized (waitingEvents) { 13 if (waitingEvents.isEmpty()) { 14 isRunning = false; 15 break; 16 } 17 } 18 } 19 } catch (InterruptedException e) { 20 LOG.error("Event thread exiting due to interruption", e); 21 } 22 23 LOG.info("EventThread shut down for session: 0x{}", 24 Long.toHexString(getSessionId())); 25 }
其中最主要的方法是processEvent方法,下面看下它都做了些什么:
a 如果 event instanceof WatcherSetEventPair,那么分别调用这些watcher的process方法
b 将event转型为Packet,判断p.cb!=null,然后分类:
if (p.response instanceof ExistsResponse || p.response instanceof SetDataResponse || p.response instanceof SetACLResponse)
StatCallback cb = (StatCallback) p.cb;
cb.processResult;
p.response instanceof GetDataResponse => DataCallback cb = (DataCallback) p.cb;
p.response instanceof CreateResponse => StringCallback cb = (StringCallback) p.cb
然后我们去分析在sendThread中调用的两个方法:
1 public void queueEvent(WatchedEvent event) { 2 if (event.getType() == EventType.None 3 && sessionState == event.getState()) { 4 return; 5 } 6 sessionState = event.getState(); 7 8 // materialize the watchers based on the event 9 WatcherSetEventPair pair = new WatcherSetEventPair( 10 watcher.materialize(event.getState(), event.getType(), 11 event.getPath()), 12 event); 13 // queue the pair (watch set & event) for later processing 14 waitingEvents.add(pair); 15 } 16 17 public void queuePacket(Packet packet) { 18 if (wasKilled) { 19 synchronized (waitingEvents) { 20 if (isRunning) waitingEvents.add(packet); 21 else processEvent(packet); 22 } 23 } else { 24 waitingEvents.add(packet); 25 } 26 }