zookeeper - 客户端源码分析

  zookeeper客户端的主类入口是Zookeeper类,负责与zookeeper server端的通信以及触发watcher等。

下文主要分析zookeeper客户端的工作流程。

1 zookeeper构造函数

主要分两类,一是不带sessionid的,这是客户端第一次连接server时采用

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
    {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
        watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        HostProvider hostProvider = new StaticHostProvider(
                connectStringParser.getServerAddresses());
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }

另一类是带有sessionid的,这主要是当发生connect掉线时,客户端发起连接用的,这些连接对用户是透明的,基本过程是相似的。

2 Zookeeper主类包含的主要成员

  主要包含一个ClientCnxn用来管理与服务器的网络通信,一个watchManager用来管理watcher;

  此外定义了内部类 ZKWatchManager 和 WatchRegistration,后者用来注册watcher到相应的watchset。

  以及供用户使用的API create exists setData getData 和getChildren以及相应的异步方法等。

  zookeeper的启动是cnxn.start()。

3 ClientCnxn类

此类的启动代码如下:

1  public void start() {
2         sendThread.start();
3         eventThread.start();
4     }

可以看到,这个类开启了两个线程,一个是SendThread、一个是EventThread。

其中SendThread负责发送 数据请求/ping等信息给服务端以及负责读取服务端发送回来的信息。

EventThread负责出来服务端发来的事件信息。

详细得说,SendThrad的运行代码是:

1 while(state.isAlive()){
2   如果没有连接:发起连接 -- 此处会调用 clientCnxnSocket的connect方法3   如果是连接状态,处理sasl事情 4   使用to表示客户端距离timeout还剩多少时间,准备发起ping连接5   调用clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this); 6 }

注意到,最后他会调用doTransport方法 。其中 pendingQueue是一个用来存放已经发送、等待回应的Packet队列,outgoingQueue是一个存放等待发送的Packet的队列。

connect方法会做些什么呢?

a 首先会创建一个sock

b  将sock注册到selector,如果立刻连接上了,调用 sendThread.primeConnection()

primeConnection 主要是用来注册zookeeper之前所有的watcher,以及认证信息。

接下来看下doTransport方法会干些什么:

synchronized (this) {    selected = selector.selectedKeys();}

它会选择已经就绪的事情,如果是connect成功时间,调用primeConnection;
如果是读写事件,调用doIO(pendingQueue, outgoingQueue, cnxn)

此外,如果发现outgoingQueue里 findSendablePacket,还需要 enableWrite

接下来,我们看下doIo都会做些什么:

如果是读取事件,那么一次会从incomingBuffer里读取rc、长度或者 信息 或者是因为没有初始化。

a 如果读取的是长度信息,那么 incomingBuffer = ByteBuffer.allocate(len);

b 如果不是长度信息,而且没有初始化,那么进行初始化,初始化会干这么几件事情:

一 读取connect结果,这会做下面几件事:

    a1 从 incoming读取连接信息,初始化一些变量

1 readTimeout = negotiatedSessionTimeout * 2 / 3;
2 connectTimeout = negotiatedSessionTimeout / hostProvider.size(); 

    b1 发送一个连接成功的事件

eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,eventState, null));

二 enableRead ,如果findSendablePacket 则 enableWrite

  三 lenBuffer.clear(), incomingBuff = lenBuffer

四 设置初始化变量为true

c 使用sendThread读取信息

1    sendThread.readResponse(incomingBuffer);
2    lenBuffer.clear();
3    incomingBuffer = lenBuffer;
4    updateLastHeard();

接下来,我们观察  readResponse会做些什么:

首先读取header,如果其xid == -2,表明是一个ping的response,return

如果xid是 -4 ,表明是一个AuthPacket的response return

如果xid是 -1,表明是一个notification,此时要继续读取并构造一个enent,通过EventThread.queueEvent发送,return

其它情况下:

从pendingQueue拿出一个Packet,校验后更新packet信息:

 1           try {
 2                 if (packet.requestHeader.getXid() != replyHdr.getXid()) {
 3                     packet.replyHeader.setErr(
 4                             KeeperException.Code.CONNECTIONLOSS.intValue());
 5                     throw new IOException("Xid out of order. Got Xid "
 6                             + replyHdr.getXid() + " with err " +
 7                             + replyHdr.getErr() +
 8                             " expected Xid "
 9                             + packet.requestHeader.getXid()
10                             + " for a packet with details: "
11                             + packet );
12                 }
13
14                 packet.replyHeader.setXid(replyHdr.getXid());
15                 packet.replyHeader.setErr(replyHdr.getErr());
16                 packet.replyHeader.setZxid(replyHdr.getZxid());
17                 if (replyHdr.getZxid() > 0) {
18                     lastZxid = replyHdr.getZxid();
19                 }
20                 if (packet.response != null && replyHdr.getErr() == 0) {
21                     packet.response.deserialize(bbia, "response");
22                 }
23
24                 if (LOG.isDebugEnabled()) {
25                     LOG.debug("Reading reply sessionid:0x"
26                             + Long.toHexString(sessionId) + ", packet:: " + packet);
27                 }
28             } finally {
29                 finishPacket(packet);
30             }

最后他会调用 finishPacket方法,此方法主要会做下面的事情:

 1     private void finishPacket(Packet p) {
 2         if (p.watchRegistration != null) {
 3             p.watchRegistration.register(p.replyHeader.getErr());
 4         }
 5
 6         if (p.cb == null) {
 7             synchronized (p) {
 8                 p.finished = true;
 9                 p.notifyAll(); -- 主要是用来通知同步调用的API方法
10             }
11         } else { -- 处理异步调用
12             p.finished = true;
13             eventThread.queuePacket(p);
14         }
15     }

至此,读事件就结束了。

如果是一个写事件,接下来会做下面的事情:

a 锁定outgoingQueue,发现一个可以发送的packet,调用createBB创建packet的bb,使用sock.write写入,并且将此packet从outgoing对列移动到pending队列。

b 做一些开关控制

 1   if (outgoingQueue.isEmpty()) {
 7                     disableWrite();
 8                 } else if (!initialized && p != null && !p.bb.hasRemaining()) {
18                     disableWrite();
19                 } else {
20                     // Just in case
21                     enableWrite();
22                 }

写事件就此结束。

可以看到sendThread中调用了eventThread的queuePacket和queueEvent方法,下面就分析一下EventThread:

此类中主要的数据结构是

private final LinkedBlockingQueue<Object> waitingEvents =    new LinkedBlockingQueue<Object>();

首先,看下其run方法:

 1         public void run() {
 2            try {
 3               isRunning = true;
 4               while (true) {
 5                  Object event = waitingEvents.take();
 6                  if (event == eventOfDeath) {
 7                     wasKilled = true;
 8                  } else {
 9                     processEvent(event);
10                  }
11                  if (wasKilled)
12                     synchronized (waitingEvents) {
13                        if (waitingEvents.isEmpty()) {
14                           isRunning = false;
15                           break;
16                        }
17                     }
18               }
19            } catch (InterruptedException e) {
20               LOG.error("Event thread exiting due to interruption", e);
21            }
22
23             LOG.info("EventThread shut down for session: 0x{}",
24                      Long.toHexString(getSessionId()));
25         }

其中最主要的方法是processEvent方法,下面看下它都做了些什么:

a 如果 event instanceof WatcherSetEventPair,那么分别调用这些watcher的process方法

b 将event转型为Packet,判断p.cb!=null,然后分类:

if (p.response instanceof ExistsResponse        || p.response instanceof SetDataResponse        || p.response instanceof SetACLResponse) 
StatCallback cb = (StatCallback) p.cb;
cb.processResult;
p.response instanceof GetDataResponse => DataCallback cb = (DataCallback) p.cb;
p.response instanceof CreateResponse => StringCallback cb = (StringCallback) p.cb

然后我们去分析在sendThread中调用的两个方法:

 1         public void queueEvent(WatchedEvent event) {
 2             if (event.getType() == EventType.None
 3                     && sessionState == event.getState()) {
 4                 return;
 5             }
 6             sessionState = event.getState();
 7
 8             // materialize the watchers based on the event
 9             WatcherSetEventPair pair = new WatcherSetEventPair(
10                     watcher.materialize(event.getState(), event.getType(),
11                             event.getPath()),
12                             event);
13             // queue the pair (watch set & event) for later processing
14             waitingEvents.add(pair);
15         }
16
17        public void queuePacket(Packet packet) {
18           if (wasKilled) {
19              synchronized (waitingEvents) {
20                 if (isRunning) waitingEvents.add(packet);
21                 else processEvent(packet);
22              }
23           } else {
24              waitingEvents.add(packet);
25           }
26        }
时间: 2024-10-06 00:12:25

zookeeper - 客户端源码分析的相关文章

【Zookeeper】源码分析目录

Zookeeper源码分析目录如下 1. [Zookeeper]源码分析之序列化 2. [Zookeeper]源码分析之持久化(一)之FileTxnLog 3. [Zookeeper]源码分析之持久化(二)之FileSnap 4. [Zookeeper]源码分析之持久化(三)之FileTxnSnapLog 5. [Zookeeper]源码分析之Watcher机制(一) 6. [Zookeeper]源码分析之Watcher机制(二)之WatchManager 7. [Zookeeper]源码分析之

Eoe客户端源码分析---SlidingMenu的使用

Eoe客户端源码分析及代码注释 使用滑动菜单SlidingMenu,单击滑动菜单的不同选项,可以通过ViewPager和PagerIndicator显示对应的数据内容. 0  BaseSlidingFragmentActivity.java 主要函数: (1)showMenu() /** * Opens the menu and shows the menu view.*/ public void showMenu() { showMenu(true); } (2)showContent() /

Eureka 系列(02)客户端源码分析

Eureka 系列(02)客户端源码分析 [TOC] 在上一篇 Eureka 系列(01)最简使用姿态 中对 Eureka 的简单用法做了一个讲解,本节分析一下 EurekaClient 的实现 DiscoveryClient.本文的源码是基于 Eureka-1.9.8. 1)服务注册(发送注册请求到注册中心) 2)服务发现(本质就是获取调用服务名所对应的服务提供者实例信息,包括IP.port等) 3)服务续约(本质就是发送当前应用的心跳请求到注册中心) 4)服务下线(本质就是发送取消注册的HT

【Zookeeper】源码分析之持久化--FileTxnSnapLog

一.前言 前面分析了FileSnap,接着继续分析FileTxnSnapLog源码,其封装了TxnLog和SnapShot,其在持久化过程中是一个帮助类. 二.FileTxnSnapLog源码分析 2.1 类的属性 public class FileTxnSnapLog { //the direcotry containing the //the transaction logs // 日志文件目录 private final File dataDir; //the directory cont

【Zookeeper】源码分析之服务器(二)

一.前言 前面阐述了服务器的总体框架,下面来分析服务器的所有父类ZooKeeperServer. 二.ZooKeeperServer源码分析 2.1 类的继承关系 public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {} 说明:ZooKeeperServer是ZooKeeper中所有服务器的父类,其实现了Session.Expirer和ServerStats.Provider接口,Session

【Zookeeper】源码分析之Watcher机制(一)

一.前言 前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类. 二.总体框图 对于Watcher机制而言,主要涉及的类主要如下. 说明: Watcher,接口类型,其定义了process方法,需子类实现. Event,接口类型,Watcher的内部类,无任何方法. KeeperState,枚举类型,Event的内部类,表示Zookeeper所处的状态. EventType,枚举类型,Event的内部类,表示Zookeeper中发生的事

【Zookeeper】源码分析之请求处理链(一)

一.前言 前面已经分析了Watcher机制的主要代码,现在接着分析Zookeeper中的请求处理链,其是Zookeeper的主要特点之一. 二.总体框图 对于请求处理链而言,所有请求处理器的父接口为RequestProcessor,其框架图如下 说明: AckRequestProcessor,将前一阶段的请求作为ACK转发给Leader. CommitProcessor,将到来的请求与本地提交的请求进行匹配,这是因为改变系统状态的本地请求的返回结果是到来的请求. FinalRequestProc

开源中国 OsChina Android 客户端源码分析(9)下载APK功能

源码中用以下载客户端的类为DownloadService,是一个服务.如果你对android服务不够理解的话,建议先查阅下有关服务的知识点.源码分析如下: 1首先我们先来看下该服务中几个重写的方法: 1.1onCreate()中 首先声明了自定义的绑定器对象,并在自定义的绑定器中添加了几个界面可以访问服务的方法,我们发现在这几个方法中,目前实际用到的start()方法用以开始下载APK,其他的没有用到.获取通知管理器.设置服务为 非前台服务.代码注释中,火蚁表明了不确定性. 其实如果将服务设置为

开源中国 OsChina Android 客户端源码分析(10)双击退出程序

在源码中,火蚁完全封装了  双击退出程序的功能 : DoubleClickExitHelper类 该类的源码分析如下: 1  构造函数中传入了 设备上下文,实现退出功能的界面,完成消息处理器的初始化:既然要有提示条,那么也需要一个Toast对象. 2 既然完全封装,那么 我们需要将 按键的事件及按键码传进去,因为界面重写的onKeyDown 事件需要返回一个boolean值,那么在DoubleClickExitHelper类中也自定义了一个返回布尔值的onKeyDown 函数. 3onKeyDo