zookeeper(9)源码分析-事件监听Watcher(2)

接着上一篇文章,继续分析和Watcher相关的类的源码。

ClientWatchManager

public Set<Watcher> materialize(Watcher.Event.KeeperState state,
        Watcher.Event.EventType type, String path);

该接口,只有一个方法,需要实现。该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。

ZKWatchManager

1、ZKWatchManager是ZooKeeper的内部类,实现了ClientWatchManager。
2、ZKWatchManager定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。

static class ZKWatchManager implements ClientWatchManager {
        //数据变化的watchers
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
        //节点存在与否的watchers
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
        //子节点变化的watchers
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();

3、materialize方法

该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。

@Override
        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            //返回结果集合
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) {//事件类型
            case None://无类型
                //添加默认watcher
                result.add(defaultWatcher);
                //根据disableAutoWatchReset和Zookeeper的状态是否为同步连接判断是否需要清空
                boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
                //针对3个不同的watcherMap进行操作
                synchronized(dataWatches) {
                    for(Set<Watcher> ws: dataWatches.values()) {
                        // 添加至结果集合
                        result.addAll(ws);
                    }
                    if (clear) { // 是否需要清空
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) {
                    for(Set<Watcher> ws: existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) {
                    for(Set<Watcher> ws: childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }

                return result;
            case NodeDataChanged:// 节点数据变化
            case NodeCreated:// 创建节点
                synchronized (dataWatches) {
                    //移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    //移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
            case NodeChildrenChanged: // 节点子节点变化
                synchronized (childWatches) {
                    // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            case NodeDeleted:// 删除节点
                synchronized (dataWatches) {
                    // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(dataWatches.remove(clientPath), result);
                }
                // XXX This shouldn‘t be needed, but just in case
                synchronized (existWatches) {
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        addTo(list, result);
                        LOG.warn("We are triggering an exists watch for delete! Shouldn‘t happen!");
                    }
                }
                synchronized (childWatches) {
                    //移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default:
                String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
            }

            return result;
        }
    }

原文地址:https://blog.51cto.com/janephp/2455948

时间: 2024-10-03 11:05:44

zookeeper(9)源码分析-事件监听Watcher(2)的相关文章

zookeeper(10)源码分析-事件监听Watcher(3)

今天继续源码分析,分析一下org.apache.zookeeper.server下的WatchManager类. WatcherManager类用于管理watchers和相应的触发器. 类的属性 //watchTable表示从节点路径到watcher集合的映射 private final HashMap<String, HashSet<Watcher>> watchTable = new HashMap<String, HashSet<Watcher>>()

zookeeper(8)源码分析-事件监听Watcher(1)

Watcher是zookeeper的事件监听机制,今天我们来看看Watcher类的代码都包含了什么内容? Watcher Watcher是一个接口,定义了process方法,需要子类实现.其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件. abstract public void process(WatchedEvent event); 内部类 1.Event接口 表示事件代表的状态,其包含了KeeperState和EventType两个内

【Zookeeper】源码分析目录

Zookeeper源码分析目录如下 1. [Zookeeper]源码分析之序列化 2. [Zookeeper]源码分析之持久化(一)之FileTxnLog 3. [Zookeeper]源码分析之持久化(二)之FileSnap 4. [Zookeeper]源码分析之持久化(三)之FileTxnSnapLog 5. [Zookeeper]源码分析之Watcher机制(一) 6. [Zookeeper]源码分析之Watcher机制(二)之WatchManager 7. [Zookeeper]源码分析之

Spark2.1.0之源码分析——事件总线

阅读提示:阅读本文前,最好先阅读<Spark2.1.0之源码分析--事件总线>.<Spark2.1.0事件总线分析--ListenerBus的继承体系>及<Spark2.1.0事件总线分析--SparkListenerBus详解>几篇文章的内容. LiveListenerBus继承了SparkListenerBus,并实现了将事件异步投递给监听器,达到实时刷新UI界面数据的效果.LiveListenerBus主要由以下部分组成: eventQueue:是SparkLis

jQuery 2.0.3 源码分析 事件绑定 - bind/live/delegate/on

转:http://www.cnblogs.com/aaronjs/p/3440647.html?winzoom=1 事件(Event)是JavaScript应用跳动的心脏,通过使用JavaScript ,你可以监听特定事件的发生,并规定让某些事件发生以对这些事件做出响应 事件的基础就不重复讲解了,本来是定位源码分析实现的, 所以需要有一定的基础才行 为了下一步更好的理解内部的实现,所以首先得清楚的认识到事件接口的划分 网上资料遍地都是,但是作为一个jQuery系列的源码分析,我还是很有必要在重新

【朝花夕拾】Android自定义View篇之(六)Android事件分发机制(中)从源码分析事件分发逻辑及经常遇到的一些“诡异”现象

前言 转载请注明,转自[https://www.cnblogs.com/andy-songwei/p/11039252.html]谢谢! 在上一篇文章[[朝花夕拾]Android自定义View篇之(五)Android事件分发机制(上)Touch三个重要方法的处理逻辑][下文简称(五),请先阅读完(五)再阅读本文],我们通过示例和log来分析了Android的事件分发机制.这些,我们只是看到了现象,如果要进一步了解事件分发机制,这是不够的,我们还需要透过现象看本质,去研究研究源码.本文将从源码(基

【Zookeeper】源码分析之Watcher机制(一)

一.前言 前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类. 二.总体框图 对于Watcher机制而言,主要涉及的类主要如下. 说明: Watcher,接口类型,其定义了process方法,需子类实现. Event,接口类型,Watcher的内部类,无任何方法. KeeperState,枚举类型,Event的内部类,表示Zookeeper所处的状态. EventType,枚举类型,Event的内部类,表示Zookeeper中发生的事

【Zookeeper】源码分析之持久化--FileTxnSnapLog

一.前言 前面分析了FileSnap,接着继续分析FileTxnSnapLog源码,其封装了TxnLog和SnapShot,其在持久化过程中是一个帮助类. 二.FileTxnSnapLog源码分析 2.1 类的属性 public class FileTxnSnapLog { //the direcotry containing the //the transaction logs // 日志文件目录 private final File dataDir; //the directory cont

【Zookeeper】源码分析之服务器(二)

一.前言 前面阐述了服务器的总体框架,下面来分析服务器的所有父类ZooKeeperServer. 二.ZooKeeperServer源码分析 2.1 类的继承关系 public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {} 说明:ZooKeeperServer是ZooKeeper中所有服务器的父类,其实现了Session.Expirer和ServerStats.Provider接口,Session