Zookeeper源码分析:Watcher机制

1. 设置Watcher

使用Watcher需要先实现Watcher接口,并将实现类对象传递到指定方法中,如getChildren, exist等。Zookeeper允许在构造Zookeeper对象时候指定一个默认Watcher对象.getChildren和exit方法可以使用这个默认的Watcher对象,也可以指定一个新Watcher对象。

Code 1: Watcher接口

public interface Watcher {

    /**
     * Event的状态
     */
    public interface Event {
        /**
         * 在事件发生时,ZooKeeper的状态
         */
        public enum KeeperState {

            @Deprecated
            Unknown (-1),

            Disconnected (0),

            @Deprecated
            NoSyncConnected (1),

            SyncConnected (3),

            AuthFailed (4),

            ConnectedReadOnly (5),

            SaslAuthenticated(6),

            Expired (-112);

            private final int intValue;  

            KeeperState( int intValue) {
                this.intValue = intValue;
            }   

            ......
        }

        /**
         * ZooKeeper中的事件
         */
        public enum EventType {
            None (-1),
            NodeCreated (1),
            NodeDeleted (2),
            NodeDataChanged (3),
            NodeChildrenChanged (4);

            private final int intValue;     // Integer representation of value
                                            // for sending over wire
            EventType( int intValue) {
                this.intValue = intValue;
            }
            ......  
        }
    }

    //Watcher的回调方法
    abstract public void process(WatchedEvent event);
}

Code 2: Zookeeper.getChildren(final String, Watcher)方法

public List<String> getChildren(final String path, Watcher watcher)
    throws KeeperException, InterruptedException
{
    final String clientPath = path;
    PathUtils. validatePath(clientPath);

    WatchRegistration wcb = null;
    //如果watcher不等于null, 构建WatchRegistration对象,
    //该对象描述了watcher和path之间的关系
    if (watcher != null) {
        wcb = new ChildWatchRegistration(watcher, clientPath);
    }
   
    //在传入的path加上root path前缀,构成服务器端的绝对路径
    final String serverPath = prependChroot(clientPath);
   
    //构建RequestHeader对象
    RequestHeader h = new RequestHeader();
    //设置操作类型为OpCode. getChildren
    h.setType(ZooDefs.OpCode. getChildren);
    //构建GetChildrenRequest对象
    GetChildrenRequest request = new GetChildrenRequest();
    //设置path
    request.setPath(serverPath);
    //设置是否使用watcher
    request.setWatch(watcher != null);
    //构建GetChildrenResponse对象
    GetChildrenResponse response = new GetChildrenResponse();
    //提交请求,并阻塞等待结果
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code. get(r.getErr()),
                clientPath);
    }
    return response.getChildren();
}

Follower的NIOServerCnxn类接到了Client的请求,会调用ZookeeperServer.processPacket()方法。该方法会构建一个Request对象,并调用第一个处理器FollowerRequestProcessor。

由于我们的请求只是一个读操作,而不是一个Quorum请求或者sync请求,所以FollowerRequestProcessor不需要调用Follower.request()方法将请求转给Leader,只需要将请求传递到下一个处理器CommitProcessor。

处理器CommitProcessor线程发现请求是读请求后,直接将Requet对象加入到toProcess队列中,在接下的循环中会调用FinalRequestProcessor.processRequest方法进行处理。

FinalRequestProcessor.processRequest方法最终会调用ZKDatabase中的读操作方法(如statNode和getData方法), 而ZKDatabase的这些方法会最终调用DataTree类的方法来获取指定path的znode信息并返回给Client端,同时也会设置Watcher。

Code 3: FinalRequestProcessor对OpCode.getData请求的处理

case OpCode. getData: {
               lastOp = "GETD";
               GetDataRequest getDataRequest = new GetDataRequest();
               ByteBufferInputStream. byteBuffer2Record(request.request,
                       getDataRequest);
               //获得znode对象
               DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
               //n为null, 抛出NoNodeException异常
               if (n == null) {
                   throw new KeeperException.NoNodeException();
               }
               Long aclL;
               synchronized(n) {
                   aclL = n. acl;
               }
               //检查是否有读权限
               PrepRequestProcessor. checkACL(zks, zks.getZKDatabase().convertLong(aclL),
                       ZooDefs.Perms. READ,
                       request. authInfo);
               //构建状态对象stat
               Stat stat = new Stat();
               //获得指定path的znode数据,
               //如果GetDataRequest.getWatcher()返回true, 将ServerCnxn类型对象cnxn传递进去。
               //ServerCnxn是实现了Watcher接口
               byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                       getDataRequest. getWatch() ? cnxn : null);
               //构建GetDataResponse对象
               rsp = new GetDataResponse(b, stat);
               break;
           }

Code 4: DataTree.getData()方法

public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    //从nodes map中获取指定path的DataNode对象
    DataNode n = nodes.get(path);
    //如果n为null, 则抛出NoNodeException异常
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        //将n的状态copy到stat中
        n.copyStat(stat);
        //如果watcher不会null, 则将(path, watcher)键值对放入dataWatchers Map里
        if (watcher != null) {
            dataWatches.addWatch(path, watcher);
        }
        //返回节点数据
        return n.data ;
    }
}

2. 修改znode数据触发Watcher

在Zookeeper二阶段提交的COMMIT阶段。当Follower从Leader那接收到一个写请求的Leader.COMMIT数据包,会调用FinalRequestProcessor.processRequest()方法。Leader本身在发送完Leader.COMMIT数据包,也会调用FinalRequestProcessor.processRequest()方法。

如果是setData修改数据请求,那么FinalRequestProcessor.processRequest()方法最终会调用到DataTree.setData方法将txn应用到指定znode上,同时触发Watcher,并发送notification给Client端。

其关SetData请求的时序图如下:

Code 5: DataTree.setData()方法

public Stat setData(String path, byte data[], int version, long zxid,
        long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    //根据path, 获得DataNode对象n
    DataNode n = nodes.get(path);
    //如果n为null, 则抛出NoNodeException异常
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    byte lastdata[] = null;
    synchronized (n) {
        lastdata = n. data;
        n. data = data;
        n. stat.setMtime(time);
        n. stat.setMzxid(zxid);
        n. stat.setVersion(version);
        n.copyStat(s);
    }
    // now update if the path is in a quota subtree.
    String lastPrefix = getMaxPrefixWithQuota(path);
    if(lastPrefix != null) {
      this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
          - (lastdata == null ? 0 : lastdata.length ));
    }
    //触发Watcher
    dataWatches.triggerWatch(path, EventType.NodeDataChanged);
    return s;
}

Code 6: WatchManage.triggerWatcher()方法,触发Watcher。

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
            KeeperState. SyncConnected, path);
    HashSet<Watcher> watchers;
    synchronized (this ) {
        //从watchTable删除掉path对于的watcher
        watchers = watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG .isTraceEnabled()) {
                ZooTrace. logTraceMessage(LOG,
                        ZooTrace. EVENT_DELIVERY_TRACE_MASK,
                        "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }
    //循环处理所有关于path的Watcher, 这里Watcher对象实际上就是ServerCnxn类型对象
    for (Watcher w : watchers) {
        if (supress != null && supress.contains(w)) {
            continue;
        }
        w.process(e);
    }
    return watchers;
}

Code 7: NIOServerCnxn.process方法,发送notification给Client端

synchronized public void process (WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    if (LOG .isTraceEnabled()) {
        ZooTrace. logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK ,
                                 "Deliver event " + event + " to 0x"
                                 + Long. toHexString(this. sessionId)
                                 + " through " + this );
    }

    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();
   
    //发送notification给Client端
    sendResponse(h, e, "notification");
}

3. 总结

Watcher具有one-time trigger的特性,在代码中我们也可以看到一个watcher被处理后会立即从watchTable中删掉。

本博客系博主原创,转载请附上原博客地址:http://blog.csdn.net/jeff_fangji/article/details/43910113

时间: 2024-10-09 01:01:13

Zookeeper源码分析:Watcher机制的相关文章

zookeeper源码分析之一服务端处理请求流程

上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析各自一下消息处理过程: 前文可以看到在 1.在单机情况下NettyServerCnxnFactory中启动ZookeeperServer来处理消息: public synchronized void startup() { if (sessionTracker == null) { createSe

zookeeper源码分析之五服务端(集群leader)处理请求流程

leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor 具体情况可以参看代码: @Override protected v

zookeeper源码分析之一客户端发送请求流程

znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等. 知识准备: zookeeper定义的状态有: Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),Sasl

nginx源码分析--进程间通信机制 &amp; 同步机制

Nginx源码分析-进程间通信机制 从nginx的进程模型可以知道,master进程和worker进程需要通信,nginx中通信的方式有套接字.共享内存.信号.对于master进程,从外部接受信号,master进程主要就是监控.接受外部信号,将有必要的信号传递给worker进程,master进程大部分时间都是阻塞在sigsuspend()函数调用上.Worker进程屏蔽了所有的外部信号,那么Master进程就通过套接字和worker进程通信,worker进程修改全局变量,使得worker进程接受

Zookeeper 源码分析-启动

Zookeeper 源码分析-启动 博客分类: Zookeeper 本文主要介绍了zookeeper启动的过程 运行zkServer.sh start命令可以启动zookeeper.入口的main函数在类中QuorumPeerMain. main函数主要调用了runFromConfig函数,创建了QuorumPeer对象,并且调用了start函数,从而启动了zookeeper. Java代码   public class QuorumPeerMain { protected QuorumPeer

MyBatis 源码分析 - 插件机制

1.简介 一般情况下,开源框架都会提供插件或其他形式的拓展点,供开发者自行拓展.这样的好处是显而易见的,一是增加了框架的灵活性.二是开发者可以结合实际需求,对框架进行拓展,使其能够更好的工作.以 MyBatis 为例,我们可基于 MyBatis 插件机制实现分页.分表,监控等功能.由于插件和业务无关,业务也无法感知插件的存在.因此可以无感植入插件,在无形中增强功能. 开发 MyBatis 插件需要对 MyBatis 比较深了解才行,一般来说最好能够掌握 MyBatis 的源码,门槛相对较高.本篇

storm操作zookeeper源码分析-cluster.clj

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中).backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState.clojure中的protocol可以看成java中的接口,封装了一组方法.ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协

zookeeper源码分析三LEADER与FOLLOWER同步数据流程

根据二)中的分析,如果一台zookeeper服务器成为集群中的leader,那么一定是当前所有服务器中保存数据最多的服务器,所以在这台服务器成为leader之后,首先要做的事情就是与集群中的其它服务器(现在是follower)同步数据,保证大家的数据一致,这个过程完毕了才开始正式处理来自客户端的连接请求. 首先来看Leader做的工作:二)中提到的同步数据时使用的逻辑时钟,它的初始值是0,每次选举过程都会递增的,在leader正式上任之后做的第一件事情,就是根据当前保存的数据id值,设置最新的逻

ZooKeeper源码分析:Quorum请求的整个流程(转)

Quorum请求是转发给Leader处理,并且需要得一个Follower Quorum确认的请求.这些请求包括: 1)znode的写操作(OpCode.create,OpCode.delete,OpCode.setData,OpCode.setACL) 2)Session的创建和关闭操作(OpCode.createSession和OpCode.closeSession) 3)OpCode.multi操作. 本文分析了Client, Follower和Leader协同完成Quorum请求的过程.另