【Zookeeper】源码分析之Watcher机制(一)

一、前言

  前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类。

二、总体框图

  对于Watcher机制而言,主要涉及的类主要如下。

  

  说明:

  Watcher,接口类型,其定义了process方法,需子类实现。

  Event,接口类型,Watcher的内部类,无任何方法。

  KeeperState,枚举类型,Event的内部类,表示Zookeeper所处的状态。

  EventType,枚举类型,Event的内部类,表示Zookeeper中发生的事件类型。

  WatchedEvent,表示对ZooKeeper上发生变化后的反馈,包含了KeeperState和EventType。

  ClientWatchManager,接口类型,表示客户端的Watcher管理者,其定义了materialized方法,需子类实现。

  ZKWatchManager,Zookeeper的内部类,继承ClientWatchManager。

  MyWatcher,ZooKeeperMain的内部类,继承Watcher。

  ServerCnxn,接口类型,继承Watcher,表示客户端与服务端的一个连接。

  WatchManager,管理Watcher。

三、Watcher源码分析

  3.1 内部类

  Event,接口类型,表示事件代表的状态,除去其内部类,其源码结构如下  

public interface Event {}

  说明:可以看到,Event接口并没有定义任何属性和方法,但是其包含了KeeperState和EventType两个内部枚举类。

  3.2 接口方法  

abstract public void process(WatchedEvent event);

  说明:其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件。

四、Event源码分析

  3.1 内部类

  1. KeeperState  

        public enum KeeperState { // 事件发生时Zookeeper的状态
            /** Unused, this state is never generated by the server */
            @Deprecated
            // 未知状态,不再使用,服务器不会产生此状态
            Unknown (-1), 

            /** The client is in the disconnected state - it is not connected
             * to any server in the ensemble. */
            // 断开
            Disconnected (0),

            /** Unused, this state is never generated by the server */
            @Deprecated
            // 未同步连接,不再使用,服务器不会产生此状态
            NoSyncConnected (1),

            /** The client is in the connected state - it is connected
             * to a server in the ensemble (one of the servers specified
             * in the host connection parameter during ZooKeeper client
             * creation). */
            // 同步连接状态
            SyncConnected (3),

            /**
             * Auth failed state
             */
            // 认证失败状态
            AuthFailed (4),

            /**
             * The client is connected to a read-only server, that is the
             * server which is not currently connected to the majority.
             * The only operations allowed after receiving this state is
             * read operations.
             * This state is generated for read-only clients only since
             * read/write clients aren‘t allowed to connect to r/o servers.
             */
            // 只读连接状态
            ConnectedReadOnly (5),

            /**
              * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
              * so that they can perform Zookeeper actions with their SASL-authorized permissions.
              */
            // SASL认证通过状态
            SaslAuthenticated(6),

            /** The serving cluster has expired this session. The ZooKeeper
             * client connection (the session) is no longer valid. You must
             * create a new client connection (instantiate a new ZooKeeper
             * instance) if you with to access the ensemble. */
            // 过期状态
            Expired (-112);

            // 代表状态的整形值
            private final int intValue;     // Integer representation of value
                                            // for sending over wire

            // 构造函数
            KeeperState(int intValue) {
                this.intValue = intValue;
            }

            // 返回整形值
            public int getIntValue() {
                return intValue;
            }

            // 从整形值构造相应的状态
            public static KeeperState fromInt(int intValue) {
                switch(intValue) {
                    case   -1: return KeeperState.Unknown;
                    case    0: return KeeperState.Disconnected;
                    case    1: return KeeperState.NoSyncConnected;
                    case    3: return KeeperState.SyncConnected;
                    case    4: return KeeperState.AuthFailed;
                    case    5: return KeeperState.ConnectedReadOnly;
                    case    6: return KeeperState.SaslAuthenticated;
                    case -112: return KeeperState.Expired;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");
                }
            }
        }

  说明:KeeperState是一个枚举类,其定义了在事件发生时Zookeeper所处的各种状态,其还定义了一个从整形值返回对应状态的方法fromInt。

  2. EventType 

        public enum EventType { // 事件类型
            // 无
            None (-1),
            // 结点创建
            NodeCreated (1),
            // 结点删除
            NodeDeleted (2),
            // 结点数据变化
            NodeDataChanged (3),
            // 结点子节点变化
            NodeChildrenChanged (4);

            // 代表事件类型的整形
            private final int intValue;     // Integer representation of value
                                            // for sending over wire

            // 构造函数
            EventType(int intValue) {
                this.intValue = intValue;
            }

            // 返回整形
            public int getIntValue() {
                return intValue;
            }

            // 从整形构造相应的事件
            public static EventType fromInt(int intValue) {
                switch(intValue) {
                    case -1: return EventType.None;
                    case  1: return EventType.NodeCreated;
                    case  2: return EventType.NodeDeleted;
                    case  3: return EventType.NodeDataChanged;
                    case  4: return EventType.NodeChildrenChanged;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to EventType");
                }
            }
        }
    }

  说明:EventType是一个枚举类,其定义了事件的类型(如创建节点、删除节点等事件),同时,其还定义了一个从整形值返回对应事件类型的方法fromInt。

五、WatchedEvent

  5.1 类的属性  

public class WatchedEvent {
    // Zookeeper的状态
    final private KeeperState keeperState;
    // 事件类型
    final private EventType eventType;
    // 事件所涉及节点的路径
    private String path;
}

  说明:WatchedEvent类包含了三个属性,分别代表事件发生时Zookeeper的状态、事件类型和发生事件所涉及的节点路径。

  5.2 构造函数

  1. public WatchedEvent(EventType eventType, KeeperState keeperState, String path)型构造函数 

    public WatchedEvent(EventType eventType, KeeperState keeperState, String path) {
        // 初始化属性
        this.keeperState = keeperState;
        this.eventType = eventType;
        this.path = path;
    }

  说明:构造函数传入了三个参数,然后分别对属性进行赋值操作。

  2. public WatchedEvent(WatcherEvent eventMessage)型构造函数  

    public WatchedEvent(WatcherEvent eventMessage) {
        // 从eventMessage中取出相应属性进行赋值
        keeperState = KeeperState.fromInt(eventMessage.getState());
        eventType = EventType.fromInt(eventMessage.getType());
        path = eventMessage.getPath();
    }

  说明:构造函数传入了WatcherEvent参数,之后直接从该参数中取出相应属性进行赋值操作。

  对于WatchedEvent类的方法而言,相对简单,包含了几个getXXX方法,用于获取相应的属性值。

六、ClientWatchManager

  6.1 接口方法 

public Set<Watcher> materialize(Watcher.Event.KeeperState state,
        Watcher.Event.EventType type, String path);

  说明:该方法表示事件发生时,返回需要被通知的Watcher集合,可能为空集合。

七、ZKWatchManager

  7.1 类的属性 

    private static class ZKWatchManager implements ClientWatchManager {
        // 数据变化的Watchers
        private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
        // 节点存在与否的Watchers
        private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
        // 子节点变化的Watchers
        private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();
    }

  说明:ZKWatchManager实现了ClientWatchManager,并定义了三个Map键值对,键为节点路径,值为Watcher。分别对应数据变化的Watcher、节点是否存在的Watcher、子节点变化的Watcher。

  7.2 核心方法分析

  1. materialize方法

        public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                        Watcher.Event.EventType type,
                                        String clientPath)
        {
            // 新生成结果Watcher集合
            Set<Watcher> result = new HashSet<Watcher>();

            switch (type) { // 确定事件类型
            case None: // 无类型
                // 添加默认Watcher
                result.add(defaultWatcher);
                // 是否需要清空(提取对zookeeper.disableAutoWatchReset字段进行配置的值、Zookeeper的状态是否为同步连接)
                boolean clear = ClientCnxn.getDisableAutoResetWatch() &&
                        state != Watcher.Event.KeeperState.SyncConnected;

                synchronized(dataWatches) { // 同步块
                    for(Set<Watcher> ws: dataWatches.values()) {
                        // 添加至结果集合
                        result.addAll(ws);
                    }
                    if (clear) { // 是否需要清空
                        dataWatches.clear();
                    }
                }

                synchronized(existWatches) { // 同步块
                    for(Set<Watcher> ws: existWatches.values()) {
                        // 添加至结果集合
                        result.addAll(ws);
                    }
                    if (clear) { // 是否需要清空
                        existWatches.clear();
                    }
                }

                synchronized(childWatches) { // 同步块
                    for(Set<Watcher> ws: childWatches.values()) {
                        // 添加至结果集合
                        result.addAll(ws);
                    }
                    if (clear) { // 是否需要清空
                        childWatches.clear();
                    }
                }
                // 返回结果
                return result;
            case NodeDataChanged: // 节点数据变化
            case NodeCreated: // 创建节点
                synchronized (dataWatches) { // 同步块
                    // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
            case NodeChildrenChanged: // 节点子节点变化
                synchronized (childWatches) {
                    // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            case NodeDeleted: // 删除节点
                synchronized (dataWatches) {
                    // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(dataWatches.remove(clientPath), result);
                }
                // XXX This shouldn‘t be needed, but just in case
                synchronized (existWatches) {
                    // 移除clientPath对应的Watcher
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        // 移除clientPath对应的Watcher后全部添加至结果集合
                        addTo(existWatches.remove(clientPath), result);
                        LOG.warn("We are triggering an exists watch for delete! Shouldn‘t happen!");
                    }
                }
                synchronized (childWatches) {
                    // 移除clientPath对应的Watcher后全部添加至结果集合
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default: // 缺省处理
                String msg = "Unhandled watch event type " + type
                    + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
            }

            // 返回结果集合
            return result;
        }
    }

  说明:该方法在事件发生后,返回需要被通知的Watcher集合。在该方法中,首先会根据EventType类型确定相应的事件类型,然后根据事件类型的不同做出相应的操作,如针对None类型,即无任何事件,则首先会从三个键值对中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合;针对NodeDataChanged和NodeCreated事件而言,其会从dataWatches和existWatches中删除clientPath对应的Watcher,然后将剩余的Watcher集合添加至结果集合。

八、总结

  针对Watcher机制的第一部分的源码分析就已经完成,可以看到此部分的源码相对简单,之后会分析org.apache.zookeeper.server下的WatchManager和ClientWatchManager所在外部类ZooKeeper,也谢谢各位园友的观看~

时间: 2024-12-31 05:16:57

【Zookeeper】源码分析之Watcher机制(一)的相关文章

Zookeeper源码分析:Watcher机制

1. 设置Watcher 使用Watcher需要先实现Watcher接口,并将实现类对象传递到指定方法中,如getChildren, exist等.Zookeeper允许在构造Zookeeper对象时候指定一个默认Watcher对象.getChildren和exit方法可以使用这个默认的Watcher对象,也可以指定一个新Watcher对象. Code 1: Watcher接口 public interface Watcher {     /**      * Event的状态      */

zookeeper源码分析之一服务端处理请求流程

上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析各自一下消息处理过程: 前文可以看到在 1.在单机情况下NettyServerCnxnFactory中启动ZookeeperServer来处理消息: public synchronized void startup() { if (sessionTracker == null) { createSe

zookeeper源码分析之五服务端(集群leader)处理请求流程

leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor 具体情况可以参看代码: @Override protected v

zookeeper源码分析之一客户端发送请求流程

znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等. 知识准备: zookeeper定义的状态有: Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),Sasl

Zookeeper 源码分析-启动

Zookeeper 源码分析-启动 博客分类: Zookeeper 本文主要介绍了zookeeper启动的过程 运行zkServer.sh start命令可以启动zookeeper.入口的main函数在类中QuorumPeerMain. main函数主要调用了runFromConfig函数,创建了QuorumPeer对象,并且调用了start函数,从而启动了zookeeper. Java代码   public class QuorumPeerMain { protected QuorumPeer

nginx的源码分析--间接回调机制的使用和类比

nginx使用了间接回调机制,结合upstream机制的使用来说明一下,首先明确几个事实: 1)其实ngxin和下游客户端的连接使用的是ngx_http_connection_t,每个连接对应着一个读事件.一个写事件,epoll监听队列监听的是事件(ngx_event_t),但是事件的data字段对应于这个事件所属的连接(ngx-connection_t).但是nginx和上游服务器之间的连接使用的ngx_peer_connection_t,其实ngx_peer_connection_t是ngx

storm操作zookeeper源码分析-cluster.clj

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中).backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState.clojure中的protocol可以看成java中的接口,封装了一组方法.ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协

飞鸽传书源码分析二消息机制

转载请注明出处:http://blog.csdn.net/mxway/article/details/40225725 本篇文章是在飞鸽传书2.06源码的基础的分析的. 飞鸽传书的消息大致可分为三类:普通窗口类(后面以TMainWin为例进行分析)消息,对话框类(后面以TSendDlg为例进行分析)消息,对话框控件(后面以TEditSub为例进行分析)消息.这三类消息先合后分,这三类窗口设置的消息处理函数都是TApp::WinProc,在TApp::WinProc函数中再分发给各自的消息处理函数

zookeeper源码分析三LEADER与FOLLOWER同步数据流程

根据二)中的分析,如果一台zookeeper服务器成为集群中的leader,那么一定是当前所有服务器中保存数据最多的服务器,所以在这台服务器成为leader之后,首先要做的事情就是与集群中的其它服务器(现在是follower)同步数据,保证大家的数据一致,这个过程完毕了才开始正式处理来自客户端的连接请求. 首先来看Leader做的工作:二)中提到的同步数据时使用的逻辑时钟,它的初始值是0,每次选举过程都会递增的,在leader正式上任之后做的第一件事情,就是根据当前保存的数据id值,设置最新的逻