Zookeeper源码阅读(七) Server端Watcher

前言

前面一篇主要介绍了Watcher接口相关的接口和实体类,但是主要是zk客户端相关的代码,如前一篇开头所说,client需要把watcher注册到server端,这一篇分析下server端的watcher。

主要分析Watchmanager类。

Watchmanager

这是WatchManager的类图介绍。来看看代码:

/**
 * This class manages watches. It allows watches to be associated with a string
 * and removes watchers and their watches in addition to managing triggers.
 */
//如注释所言,这个类主要负责管理watcher
public class WatchManager {
    private static final Logger LOG = LoggerFactory.getLogger(WatchManager.class);

    //路径->watcher的映射
    private final HashMap<String, HashSet<Watcher>> watchTable =
        new HashMap<String, HashSet<Watcher>>();

    //watcher->路径的映射
    private final HashMap<Watcher, HashSet<String>> watch2Paths =
        new HashMap<Watcher, HashSet<String>>();

size

public synchronized int size(){
    int result = 0;
    for(Set<Watcher> watches : watchTable.values()) {//遍历路径->watcher的映射
        result += watches.size();//把所有的watch个数加起来,但这里是不是会有重复???
    }
    return result;
}

addWatch

//为某个path注册watcher
public synchronized void addWatch(String path, Watcher watcher) {
    HashSet<Watcher> list = watchTable.get(path);//获得路径对应的watcher的set
    if (list == null) {//之前没有watcher
        // don't waste memory if there are few watches on a node
        // rehash when the 4th entry is added, doubling size thereafter
        // seems like a good compromise
        list = new HashSet<Watcher>(4);//这里有优化,只建立为4的set,可能是考虑到实际使用中同一个znode不会有过多的watcher,节省了memory
        watchTable.put(path, list);//更新watchtable
    }
    list.add(watcher);//添加watcher进入set

    HashSet<String> paths = watch2Paths.get(watcher);//在watcher->路径中查找对应的路径
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();//同理,同一个watcher可能被加到多个znode上
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);//加入set
}

其实这个方法总的来说就是两大步,第一是更新路径->watcher的映射,第二步是更新watcher->路径的映射,很好理解。

removeWatcher

//与上面方法相反,remove对应的watcher
public synchronized void removeWatcher(Watcher watcher) {
    HashSet<String> paths = watch2Paths.remove(watcher);//从watcher->路径的映射中把整个watcher和它对应的所有path删掉
    if (paths == null) {//paths是否为空
        return;
    }
    for (String p : paths) {//不为空的话就取出来一个一个在路径->watcher的映射里扫描
        HashSet<Watcher> list = watchTable.get(p);//取出watcher的set
        if (list != null) {
            list.remove(watcher);//remove对应的watcher
            if (list.size() == 0) {//如果之前只有一个watcher,那么相应的path就没有watcher了,应该删掉
                watchTable.remove(p);
            }
        }
    }
}

这里其实也是两大步,第一是更新watcher->路径的映射,第二步更新路径->watcher的映射,只是第二步的时候需要遍历所有path。

triggerWatch

//根据事件类型和路径触发watcher,supress是指定的应该被过滤的watcher集合
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type,
            KeeperState.SyncConnected, path);//新建watchedEvent对象,这时一定是连接状态的
    HashSet<Watcher> watchers;
    synchronized (this) {
        watchers = watchTable.remove(path);//把对应路径所有的watcher删除并返回
        if (watchers == null || watchers.isEmpty()) {//watcher为空直接打log
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG,
                        ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                        "No watchers for " + path);
            }
            return null;
        }
        for (Watcher w : watchers) {//watcher不为空
            HashSet<String> paths = watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);//把所有的路径删掉
            }
        }
    }
    for (Watcher w : watchers) {//遍历前面获得的所有watcher
        if (supress != null && supress.contains(w)) {//如果watcher在supress的set中跳过
            continue;
        }
        w.process(e);//不在set中就触发
    }
    return watchers;
}

这里有两点需要特别说一下:

  1. 为啥这里需要一个过滤的操作呢,可以通过下面datatree中deletenode里的代码可以了解:

    Set<Watcher> processed = dataWatches.triggerWatch(path,
            EventType.NodeDeleted);//1
    childWatches.triggerWatch(path, EventType.NodeDeleted, processed);//2
    childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
            EventType.NodeChildrenChanged);

可以看到,每个节点对应的watch会存到datawatches里,且如果一个节点是另一个节点的子节点,那么在server获取getchildren指令的时候会把children相关的的watch加入到datatree的childwatches里去。这时如果节点本身已经触发过了那么childwatches里的节点的watches便不用被触发了(因为节点都要被delete了,不存在子节点)。

  1. 最后的process方法并不是客户端的watcher,而是ServerCnxn的process,默认实现是NIOServerCnxn。
@Override
synchronized public void process(WatchedEvent event) {
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                                 "Deliver event " + event + " to 0x"
                                 + Long.toHexString(this.sessionId)
                                 + " through " + this);
    }

    // Convert WatchedEvent to a type that can be sent over the wire
    WatcherEvent e = event.getWrapper();//包装watcherevent

    sendResponse(h, e, "notification");//发送回复
}

DumpWatches

/**
 * String representation of watches. Warning, may be large!
 * @param byPath iff true output watches by paths, otw output
 * watches by connection
 * @return string representation of watches
 */
//把watch写到磁盘中
public synchronized void dumpWatches(PrintWriter pwriter, boolean byPath) {
    if (byPath) {
        for (Entry<String, HashSet<Watcher>> e : watchTable.entrySet()) {
            pwriter.println(e.getKey());//利用PrintWriter去写
            for (Watcher w : e.getValue()) {
                pwriter.print("\t0x");
                pwriter.print(Long.toHexString(((ServerCnxn)w).getSessionId()));
                pwriter.print("\n");
            }
        }
    } else {
        for (Entry<Watcher, HashSet<String>> e : watch2Paths.entrySet()) {
            pwriter.print("0x");
            pwriter.println(Long.toHexString(((ServerCnxn)e.getKey()).getSessionId()));
            for (String path : e.getValue()) {
                pwriter.print("\t");
                pwriter.println(path);
            }
        }
    }
}

总结

  1. zk的cnxn的实现由NIO和Netty两种方式,最近工作也用了些Netty相关的,抽空好好学习总结下。

参考

https://www.cnblogs.com/leesf456/p/6288709.html

https://www.jianshu.com/p/9cf98fab15ac

原文地址:https://www.cnblogs.com/gongcomeon/p/9926632.html

时间: 2024-08-28 21:32:57

Zookeeper源码阅读(七) Server端Watcher的相关文章

Mina 源码阅读:Server端基于NIO的处理流程

源码面前,了无秘密.继之前阅读了Prototype.Spring.Tomcat.以及JDK的部分.Digester等等源码之后,学习一门技术,了解源码成了必备流程.也深深的感受到了源码面前,了无秘密的含义,同时也体会到它给我带来的好处.同时,也希望所有开发者,不论前端后端,如果有时间的话,都尽量看看源码吧. 接下来进入正题,这里要对Mina流程做一个分析.因为是指对NIO流程做了分析,所以这里说的也是NIO的执行流程. 先看一下Mina中主要类的大致结构: 接下来看看Mina的整个生命周期: 1

zookeeper源码分析之五服务端(集群leader)处理请求流程

leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor 具体情况可以参看代码: @Override protected v

zookeeper源码分析之一服务端处理请求流程

上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析各自一下消息处理过程: 前文可以看到在 1.在单机情况下NettyServerCnxnFactory中启动ZookeeperServer来处理消息: public synchronized void startup() { if (sessionTracker == null) { createSe

Zookeeper源码阅读(十四) 单机Server

前言 前面两篇主要说了下client-server的session相关的内容,到这里client的内容以及client-server的连接的内容也就基本告一段落了,剩下的部分就是server部分内部的结构,zk的选举以及server部分的工作机制等了. 这一篇主要说下单机server的启动过程,里面会涉及到一些server内部的工作机制和机构. Server架构 可以看到Zookeeper的server端主要分为几个大模块,ZKDatabase是zk server内部的内存数据库,内部维护了节点

Zookeeper源码阅读(五) ACL基础

前言 之前看代码的时候也同步看了看一些关于zk源码的博客,有一两篇讲到了ZK里ACL的基础的结构,我自己这边也看了看相关的代码,在这里分享一下! ACL和ID ACL和ID都是有Jute生成的实体类,分别代表了ZK里ACL和不同ACL模式下的具体实体. ACL: public class ACL implements Record { private int perms; private org.apache.zookeeper.data.Id id; 可以看到,ACL包含了两个域,分别代表了权

Zookeeper源码阅读(九) ZK Client-Server(2)

前言 前面一篇博客主要从大致流程的角度说了下client和server建立连接的流程,这篇和下一篇博客会详细的把上一篇不是很细致的地方展开和补充. 初始化阶段 初始化阶段主要就是把Zookeeper类中比较重要的功能类实例化,前面对这个过程说的已经比较详细了.这里主要补充几点: ClientCnxn初始化 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, w

Zookeeper源码阅读(十二) Seesion(1)

前言 前面三篇主要从client的角度说了下client和server建立连接的过程,这一篇和后面一篇开始看下Zookeeper中非常重要的一个概念:Session,session是zookeeper client和server建立和维护连接的单位(我这个描述感觉有点奇怪 ?? ). Session状态 Zookeeper的所有操作基本都是基于session的,如之前提到的wathcer的机制,客户端请求的顺序执行和临时节点的生命周期. 从我们使用API的角度,session的连接和保持就是客户

Zookeeper源码阅读(十八) 选举之快速选举算法FastLeaderElection

目录 前言 FastLeaderEleaction基本结构 选举方法分析 思考 参考 前言 在过去的两节里已经分析了选举过程中的一些实体类和网络IO相关的机制与源码,这一节将会对zookeeper选举的核心类FastLeaderElection进行分析. FastLeaderEleaction基本结构 可以看到FastLeaderElection的基本结构还是比较清晰的,主要从新的成员变量类和内部类来分析下FastLeaderElection的基本结构. Notification /** * N

Spark Shuffle数据处理过程与部分调优(源码阅读七)

shuffle...相当重要,为什么咩,因为shuffle的性能优劣直接决定了整个计算引擎的性能和吞吐量.相比于Hadoop的MapReduce,可以看到Spark提供多种计算结果处理方式,对shuffle过程进行了优化. 那么我们从RDD的iterator方法开始: 我们可以看到,它调用了cacheManager的getOrCompute方法,如果分区任务第一次执行还没有缓存,那么会调用computeOrReadCheckpoint.如果某个partition任务执行失败,可以利用DAG重新调