品味ZooKeeper之Watcher机制_2

品味ZooKeeper之Watcher机制

本文思维导图如下:

前言

Watcher机制是zookeeper最重要三大特性数据节点Znode+Watcher机制+ACL权限控制中的其中一个,它是zk很多应用场景的一个前提,比如集群管理、集群配置、发布/订阅。

Watcher机制涉及到客户端与服务器(注意,不止一个机器,一般是集群,这里先认为一个整体分析)的两者数据通信与消息通信,除此之外还涉及到客户端的watchManager。

下面正式进入主题。

1.watcher原理框架

由图看出,zk的watcher由客户端,客户端WatchManager,zk服务器组成。整个过程涉及了消息通信及数据存储。

  • zk客户端向zk服务器注册watcher的同时,会将watcher对象存储在客户端的watchManager。
  • Zk服务器触发watcher事件后,会向客户端发送通知,客户端线程从watchManager中回调watcher执行相应的功能。

注意的是server服务器端一般有多台共同一起对外提供服务的,里面涉及到zk专有的ZAB协议(分布式原子广播协议)。在这先不分析,后面会有单独一文来介绍,因为ZAB协议是zookeeper的实现精髓,有了zab协议才能使zk真正落地,真正的高可靠,数据同步,适于商用。

有木有看到小红旗?加入小红旗是一个watcher,当小红旗被创建并注册到node1节点(会有相应的API实现)后,就会监听node1+node_a+node_b或node_a+node_b。这里两种情况是因为在创建watcher注册时会有多种途径。并且watcher不能监听到孙节点。注意注意注意,watcher设置后,一旦触发一次后就会失效,如果要想一直监听,需要在process回调函数里重新注册相同的 watcher

2.通知状态与事件

public class WatcherTest implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        // TODO Auto-generated method stub
        WatcherTest  w = new WatcherTest();
        ZooKeeper zk = new ZooKeeper(wx.getZkpath(),10000, w);
    }

    public static void main(String[] args){
        WatcherTest  w = new WatcherTest();
        ZooKeeper   zk = new ZooKeeper(wx.getZkpath(), 10000, w);
    }
}

上面例子是把异常处理,逻辑处理等都省掉。watcher的应用很简单,主要有两步:继承 Watcher 接口,重写 process 回调函数。

当然注册方式有很多,有默认和重新覆盖方式,可以一次触发失效也可以一直有效触发。这些都可以通过代码实现。

2.1 KeeperStatus通知状态

KeeperStatus完整的类名是org.apache.zookeeper.Watcher.Event.KeeperState

2.2 EventType事件类型

EventType完整的类名是org.apache.zookeeper.Watcher.Event.EventType

此图是zookeeper常用的通知状态与对应事件类型的对应关系。除了客户端与服务器连接状态下,有多种事件的变化,其他状态的事件都是None。这也是符合逻辑的,因为没有连接服务器肯定不能获取获取到当前的状态,也就无法发送对应的事件类型了。

这里重点说下几个重要而且容易迷惑的事件:

  • NodeDataChanged事件
  • 无论节点数据发生变化还是数据版本发生变化都会触发
  • 即使被更新数据与新数据一样,数据版本dataVersion都会发生变化
  • NodeChildrenChanged
  • 新增节点或者删除节点
  • AuthFailed
  • 重点是客户端会话没有权限而是授权失败

客户端只能收到服务器发过来的相关事件通知,并不能获取到对应数据节点的原始数据及变更后的新数据。因此,如果业务需要知道变更前的数据或者变更后的新数据,需要业务保存变更前的数据(本机数据结构、文件等)和调用接口获取新的数据

3.watcher注册过程

3.1涉及接口

创建zk客户端对象实例时注册:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

通过这种方式注册的watcher将会作为整个zk会话期间的默认watcher,会一直被保存在客户端ZK WatchManagerdefaultWatcher 中,如果这个被创建的节点在其它时候被创建watcher并注册,则这个默认的watcher会被覆盖。注意注意注意,watcher触发一次就会失效,不管是创建节点时的 watcher 还是以后创建的 watcher

其他注册watcher的API:

  • getChildren(String path, Watcher watcher)
  • getChildren(String path, boolean watch)
  • Boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher
  • getData(String path, boolean watch, Stat stat)
  • Boolean watch表示是否使用上下文默认的watcher,即创建zk实例时设置的watcher
  • getData(String path, Watcher watcher, AsyncCallback.DataCallback cb, Object ctx)
  • exists(String path, boolean watch)
  • Boolean watch表示是否使用上下文中默认的watcher,即创建zk实例时设置的watcher
  • exists(String path, Watcher watcher)

举栗子

这就是watcher的简单例子,zk的实际应用集群管理,发布订阅等复杂功能其实就在这个小例子上拓展的。

3.2客户端注册

这里的客户端注册主要是把上面第一点的zookeeper原理框架的注册步骤展开,简单来说就是zk客户端在注册时会先向zk服务器请求注册,服务器会返回请求响应,如果响应成功则zk服务端把watcher对象放到客户端的WatchManager管理并返回响应给客户端。

3.3服务器端注册

FinalRequestProcessor
/**
 * This Request processor actually applies any transaction associated with a
 * request and services any queries. It is always at the end of a
 * RequestProcessor chain (hence the name), so it does not have a nextProcessor
 * member.
 *
 * This RequestProcessor counts on ZooKeeperServer to populate the
 * outstandingRequests member of ZooKeeperServer.
 */
public class FinalRequestProcessor implements RequestProcessor

由源码注释得知,FinalRequestProcessor类实际是任何事务请求和任何查询的的最终处理类。也就是我们客户端对节点的set/get/delete/create/exists等操作最终都会运行到这里。

以exists函数为例子:

case OpCode.exists: {
    lastOp = "EXIS";
    // TODO we need to figure out the security requirement for this!
    ExistsRequest existsRequest = new ExistsRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request,
    existsRequest);
    String path = existsRequest.getPath();
    if (path.indexOf(‘\0‘) != -1) {
    throw new KeeperException.BadArgumentsException();
    }
    Stat stat = zks.getZKDatabase().statNode(path, existsRequest
    .getWatch() ? cnxn : null);
    rsp = new ExistsResponse(stat);
    break;
}

existsRequest.getWatch() ? cnxn : null此句是在调用exists API时,判断是否注册watcher,若是就返回 cnxncnxn是由此句代码ServerCnxn cnxn = request.cnxn;创建的。

/**
 * Interface to a Server connection - represents a connection from a client
 * to the server.
 */
public abstract class ServerCnxn implements Stats, Watcher

通过ServerCnxn类的源码注释得知,ServerCnxn是维持服务器与客户端的tcp连接与实现了 watcher。总的来说,ServerCnxn类创建的对象cnxn即包含了连接信息又包含watcher信息。

同时仔细看ServerCnxn类里面的源码,发现有以下这个函数,process函数正是watcher的回调函数啊。

public abstract class ServerCnxn implements Stats, Watcher {
    .
    .
    public abstract void process(WatchedEvent event);
    Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
    //getZKDatabase实际上是获取是在zookeeper运行时的数据库。请看下面
    .
    .
}
ZKDatabase
/**
 * This class maintains the in memory database of zookeeper
 * server states that includes the sessions, datatree and the
 * committed logs. It is booted up  after reading the logs
 * and snapshots from the disk.
 */
public class ZKDatabase

通过源码注释得知ZKDatabase是在zookeeper运行时的数据库,在FinalRequestProcessor的case exists中会把existsRequest(exists请求传递给ZKDatabase)。

/**
 * the datatree for this zkdatabase
 * @return the datatree for this zkdatabase
 */
public DataTree getDataTree() {
return this.dataTree;
}

ZKDatabase里面有这关键的一个函数是从zookeeper运行时展开的节点数型结构中搜索到合适的节点返回。

watchManager
  • Zk服务器端Watcher的管理者
  • 从两个维度维护watcher
  • watchTable从数据节点的粒度来维护
  • watch2Paths从watcher的粒度来维护
  • 负责watcher事件的触发
    class WatchManager {
        private final Map<String, Set<Watcher>> watchTable =
        new HashMap<String, Set<Watcher>>();
    
        private final Map<Watcher, Set<String>> watch2Paths = new HashMap<Watcher, Set<String>>();
        Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null);}
    }
watcher触发
public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    DataNode n = nodes.get(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    byte lastdata[] = null;
    synchronized (n) {
        lastdata = n.data;
        n.data = data;
        n.stat.setMtime(time);
        n.stat.setMzxid(zxid);
        n.stat.setVersion(version);
        n.copyStat(s);
    }
    // now update if the path is in a quota subtree.
    String lastPrefix = getMaxPrefixWithQuota(path);
    if(lastPrefix != null) {
      this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
      - (lastdata == null ? 0 : lastdata.length));
    }
    dataWatches.triggerWatch(path, EventType.NodeDataChanged); //触发事件
    return s;
}

客户端回调watcher步骤:

  • 反序列化,将孒节流转换成WatcherEvent对象。因为在Java中网络传输肯定是使用了序列化的,主要是为了节省网络IO和提高传输效率。
  • 处理chrootPath。获取节点的根节点路径,然后再搜索树而已。
  • 还原watchedEvent:把WatcherEvent对象转换成WatchedEvent。主要是把zk服务器那边的WatchedEvent事件变为WatcherEvent,标为已watch触发。
  • 回调Watcher:把WatchedEvent对象交给EventThread线程。EventThread线程主要是负责从客户端的ZKWatchManager中取出Watcher,并放入waitingEvents队列中,然后供客户端获取。

4.小结

到此,zookeeper的watcher机制基本告一段落了,watcher机制主要是客户端、zk服务器和watchManager三者的协调合作完成的。这里只分析了watcher的内容,例如涉及到的ZAB协议等没有分析,准备把它放在下下文中,下文是zookeeper的ACL访问控制权限。

原文地址:https://www.cnblogs.com/liangjf/p/8552547.html

时间: 2024-08-29 10:26:50

品味ZooKeeper之Watcher机制_2的相关文章

Apache ZooKeeper Watcher 机制源码解释

分布式系统从根本上来说就是不同节点上的进程并发执行,并且相互之间对进程的行为进行协调处理的过程.不同节点上的进程互相协调行为的过程叫做分布式同步.许多分布式系统需要一个进程作为任务的协调者,执行一些其他进程并不执行的特殊的操作,一般情况下哪个进程担当任务的协调者都无所谓,但是必须有一个进程作为协调者,自动选举出一个协调者的过程就是分布式选举.ZooKeeper 正是为了解决这一系列问题而生的.上一篇我们介绍了 ZooKeeper 服务启动原理和源代码剖析,这一讲我们来谈谈 Watcher 机制,

品味ZooKeeper之纵古观今_1

品味ZooKeeper之纵古观今 本章思维导图 这一系列主要是从整体到细节来品味Zookeeper,先从宏观来展开,介绍zookeeper诞生的原因,接着介绍整体设计框架,接着是逐个细节击破. 本章是首篇,主要是探究zookeeper的由来和整体框架,同时是一些自己的观点,本人也是在学习大数据的路上,难免有错,希望各位认真指出,一起进步. 1.何来何去? 1.1需求引起技术变化 每位程序猿都知道,需求是驱动技术发展的源泉动力.同样的,这句话放在zookeeper也适用,若不是需要zookeepe

zk的watcher机制的实现

转载:https://www.ibm.com/developerworks/cn/opensource/os-cn-apache-zookeeper-watcher/ http://blog.csdn.net/tycoon1988/article/details/38405101 可以设置观察的操作:exists,getChildren,getData 可以触发观察的操作:create,delete,setData zookeeper观察机制; 服务端只存储事件的信息,客户端存储事件的信息和Wa

【Zookeeper】源码分析之Watcher机制(一)

一.前言 前面已经分析了Zookeeper持久话相关的类,下面接着分析Zookeeper中的Watcher机制所涉及到的类. 二.总体框图 对于Watcher机制而言,主要涉及的类主要如下. 说明: Watcher,接口类型,其定义了process方法,需子类实现. Event,接口类型,Watcher的内部类,无任何方法. KeeperState,枚举类型,Event的内部类,表示Zookeeper所处的状态. EventType,枚举类型,Event的内部类,表示Zookeeper中发生的事

品味Zookeeper之选举及数据一致性_3

品味Zookeeper之选举及数据一致性 本文思维导图 前言 为了高可用和数据安全起见,zk集群一般都是由几个节点构成(由n/2+1,投票机制决定,肯定是奇数个节点).多节点证明它们之间肯定会有数据的通信,同时,为了能够使zk集群对外是透明的,一个整体对外提供服务,那么客户端访问zk服务器的数据肯定是要数据同步,也即数据一致性. zk集群是Leader/Follower模式来保证数据同步的.整个集群同一时刻只能有一个Leader,其他都是Follower或Observer.Leader是通过选举

Zookeeper源码分析:Watcher机制

1. 设置Watcher 使用Watcher需要先实现Watcher接口,并将实现类对象传递到指定方法中,如getChildren, exist等.Zookeeper允许在构造Zookeeper对象时候指定一个默认Watcher对象.getChildren和exit方法可以使用这个默认的Watcher对象,也可以指定一个新Watcher对象. Code 1: Watcher接口 public interface Watcher {     /**      * Event的状态      */

zookeeper节点Watch机制实例展示

znode以某种方式发生变化时,“观察”(watch)机制可以让客户端得到通知.可以针对ZooKeeper服务的“操作”来设置观察,该服务的其他 操作可以触发观察. 实现Watcher,复写process方法,处理收到的变更 /** * Watcher Server,处理收到的变更 * @param watchedEvent */ @Override public void process(WatchedEvent watchedEvent) { LOG.info("收到事件通知:"

zookeeper中Watcher和Notifications

问题导读:1.zookeeper观察者什么时候调用?2.传统远程轮询服务存在什么问题?3.zk中回调服务的机制是什么?4.zk中watcher为什么不永久注册?5.什么是znode? 在阅读之前首先明确个概念:1.什么是znode? 2.什么是客户端? 我们使用znode这个术语来表示ZooKeeper的数据节点. znode维持一个stat结构,它包含数据变化的版本号.ACL变化和时间戳,以允许cache校验和协调化的更新.每当znode的数据变化时,版本号将增加.一个客户端收到数据时,它也会

Zookeeper之Watcher监听事件丢失分析

在上篇博客中,介绍了zookeeper客户Curator对监听事件的封装及应用--<Zookeeper开源客户端Curator之事件监听详解>在讲解部分代码实例的运行结果时我们已经注意到,并不是所有的监听事件都会发送到客户端.比如连续更改一个节点的内容.创建节点再马上删除节点.本篇博客就讨论一下zookeeper监听事件丢失的原因及使用时的注意事项. 案例 package com.secbro.learn.curator; import org.apache.curator.RetryPoli