zookeeper如何永久监听

转自:http://www.cnblogs.com/viviman/archive/2013/03/11/2954118.html

一 回调基础知识

znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等。

//创建一个Zookeeper实例,第一个参数为目标服务器地址和端口,第二个参数为Session超时时间,第三个为节点变化时的回调方法
ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 500000,new Watcher() {
          // 监控所有被触发的事件
           public void process(WatchedEvent event) {
     //dosomething
          }

});
将APP1的所有配置配置到/APP1 znode下,APP1所有机器一启动就对/APP1这个节点进行监控(zk.exist("/APP1",true)),并且实现回调方法Watcher,那么在zookeeper上/APP1 znode节点下数据发生变化的时候,每个机器都会收到通知,Watcher方法将会被执行,那么应用再取下数据即可(zk.getData("/APP1",false,null));

下面表格列出了写操作与ZK内部产生的事件的对应关系:

 
event For “/path”


event For “/path/child”


create(“/path”)


EventType.NodeCreated


NA


delete(“/path”)


EventType.NodeDeleted


NA


setData(“/path”)


EventType.NodeDataChanged


NA


create(“/path/child”)


EventType.NodeChildrenChanged


EventType.NodeCreated


delete(“/path/child”)


EventType.NodeChildrenChanged


EventType.NodeDeleted


setData(“/path/child”)


NA


EventType.NodeDataChanged

而ZK内部的写事件与所触发的watcher的对应关系如下:


event For “/path”


defaultWatcher


exists
(“/path”)


getData
(“/path”)


getChildren
(“/path”)


EventType.None






EventType.NodeCreated

 


 

EventType.NodeDeleted

 
√(不正常)


 

EventType.NodeDataChanged

 


 

EventType.NodeChildrenChanged

     

综合上面两个表,我们可以总结出各种写操作可以触发哪些watcher,如下表所示:

 
“/path”


“/path/child”

 
exists


getData


getChildren


exists


getData


getChildren


create(“/path”)



       

delete(“/path”)




     

setData(“/path”)



       

create(“/path/child”)

   



 

delete(“/path/child”)

   





setData(“/path/child”)

     


 

如果发生session close、authFail和invalid,那么所有类型的wather都会被触发

zkClient除了做了一些便捷包装之外,对watcher使用做了一点增强。比如subscribeChildChanges实际上是通过exists和getChildren关注了两个事件。这样当create(“/path”)时,对应path上通过getChildren注册的listener也会被调用。另外subscribeDataChanges实际上只是通过exists注册了事件。因为从上表可以看到,对于一个更新,通过exists和getData注册的watcher要么都会触发,要么都不会触发。

getData,getChildren(),exists()这三个方法可以针对参数中的path设置watcher,当path对应的Node 有相应变化时,server端会给对应的设置了watcher的client 发送一个一次性的触发通知事件。客户端在收到这个触发通知事件后,可以根据自己的业务逻辑进行相应地处理。

注意这个watcher的功能是一次性的,如果还想继续得到watcher通知,在处理完事件后,要重新register。

二 测试watcher回调机制

// Watcher实例

Watcher wh = new Watcher() {

public void process(WatchedEvent event) {

System.out.println("回调watcher实例: 路径" + event.getPath() + " 类型:"

+ event.getType());

}

};

ZooKeeper zk = new ZooKeeper("10.15.82.166:3351", 500000, wh);

System.out.println("---------------------");

// 创建一个节点root,数据是mydata,不进行ACL权限控制,节点为永久性的(即客户端shutdown了也不会消失)

zk.exists("/root", true);

zk.create("/root", "mydata".getBytes(), Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT);

System.out.println("---------------------");

// 在root下面创建一个childone znode,数据为childone,不进行ACL权限控制,节点为永久性的

zk.exists("/root/childone", true);

zk.create("/root/childone", "childone".getBytes(), Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT);

System.out.println("---------------------");

// 删除/root/childone这个节点,第二个参数为版本,-1的话直接删除,无视版本

zk.exists("/root/childone", true);

zk.delete("/root/childone", -1);

System.out.println("---------------------");

zk.exists("/root", true);

zk.delete("/root", -1);

System.out.println("---------------------");

// 关闭session

zk.close();

---------------------

回调watcher实例: 路径null 类型:None

回调watcher实例: 路径/root 类型:NodeCreated

---------------------

回调watcher实例: 路径/root/childone 类型:NodeCreated

---------------------

回调watcher实例: 路径/root/childone 类型:NodeDeleted

---------------------

回调watcher实例: 路径/root 类型:NodeDeleted

---------------------

三 永久回调

3类事件触发wather后就不再作用,也就是所谓的(一次作用),但是,如何永久监听呢?这需要我们再程序逻辑上进行控制,网上有更好的办法,但是,在简单的应用中,可以再wather方法里面再设置监听,这个方法很笨,但是,很有效,达到了预期的效果。

Watcher wh = new Watcher() {
        public void process(WatchedEvent event) {
            Log.AC_DEBUG("触发回调watcher实例: 路径" + event.getPath() + " 类型:"
                    + event.getType());

            if (event.getType() == EventType.None) {
                try { //
                        // 判断userauth权限是否能访问userpath
                    String auth_type = "digest";
                    zk.addAuthInfo(auth_type, userauth.getBytes());
                    zk.getData(userpath, null, null);
                } catch (Exception e) {
//                    e.printStackTrace();
                    Log.AC_ERROR("get node faild:userpath=" + userpath
                            + ",auth=" + userauth + " e:" + e.getMessage());
                    return;
                }
                Log.AC_INFO("userpath=" + userpath + " userauth=" + userauth);
            }

            try {

                switchinfo = getallswitch(); // 更新userpath和匿名用户路径下的配置信息,监听这些节点

                // 监听用户路径节点
                Log.AC_DEBUG("lesson user=" + userpath + " node...");
                zk.exists(userpath, true); // 监听匿名用户路径节点
                Log.AC_DEBUG("lesson user=" + AnonymousUSERpath + " node...");
                zk.exists(AnonymousUSERpath, true);

                // 监听用户路径下的开关节点
                if (zk.exists(userpath, false) != null) {
                    Log.AC_DEBUG("lesson user=" + userpath
                            + " ‘s swich node...");
                    List<String> swnodes = zk.getChildren(userpath, true); //
                    // 监听switch层节点的变化
                    Iterator<String> it_sw = swnodes.iterator();
                    while (it_sw.hasNext()) {
                        String swpath = userpath + "/" + it_sw.next();
                        Log.AC_DEBUG("lesson user=" + swpath + " node...");
                        zk.exists(swpath, true);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                Log.AC_ERROR("lesson znode error:" + e.getMessage());
            }
        }
    };
时间: 2024-11-03 12:35:42

zookeeper如何永久监听的相关文章

Zookeeper之Watcher监听事件丢失分析

在上篇博客中,介绍了zookeeper客户Curator对监听事件的封装及应用--<Zookeeper开源客户端Curator之事件监听详解>在讲解部分代码实例的运行结果时我们已经注意到,并不是所有的监听事件都会发送到客户端.比如连续更改一个节点的内容.创建节点再马上删除节点.本篇博客就讨论一下zookeeper监听事件丢失的原因及使用时的注意事项. 案例 package com.secbro.learn.curator; import org.apache.curator.RetryPoli

zookeeper客户端设置监听

1.目的 zookeeper是一个分布式服务管理框架.zookeeper提供了对客户端的通知,即在服务器端的节点有修改或者删除的时候,能够给客户端进行通知. 2.服务器端部署 服务器端部署zookeeper的步骤省略,具体的可以参看我前面的文章,部署也比较简单. 3.客户端接收通知代码 import java.io.IOException; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apac

zookeeper中的事件监听--cache篇

最近有接触zookeeper,有用到一部分的功能,所以在这里简单记录一下: Curator中事件的监听 原生zookeeper的事件监听采用Watcher实现,不过Watcher监听是一次性的,如果需要继续监听该事件,必须重新注册. Curator中采用cache来封装了对事件的监听,在包org.apache.curator.framework.recipes.cache封装了3种类型的事件监听. cache 分为三种(其实就是从不同的维度去解析cache): 1.PathChildrenCac

zookeeper(8)源码分析-事件监听Watcher(1)

Watcher是zookeeper的事件监听机制,今天我们来看看Watcher类的代码都包含了什么内容? Watcher Watcher是一个接口,定义了process方法,需要子类实现.其代表了实现Watcher接口时必须实现的的方法,即定义进行处理,WatchedEvent表示观察的事件. abstract public void process(WatchedEvent event); 内部类 1.Event接口 表示事件代表的状态,其包含了KeeperState和EventType两个内

ZooKeeper(3.4.5) 使用Curator监听事件

ZooKeeper原生的API支持通过注册Watcher来进行事件监听,但是Watcher通知是一次性的,因此开发过程中需要反复注册Watcher,比较繁琐.Curator引入了Cache来监听ZooKeeper服务端的事件.Cache对ZooKeeper事件监听进行了封装,能够自动处理反复注册监听,简化了ZooKeeper原生API繁琐的开发过程. 简单的示例: package com.huey.dream.demo; import java.util.concurrent.ExecutorS

.Net客户端监听ZooKeeper节点数据变化

一个很简单的例子,用途是监听zookeeper中某个节点数据的变化,具体请参见代码中的注释 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using ZooKeeperNet; namespace ZooKeeperDemo { /// <summary> ///

zookeeper 3.5.0 修改管理控制台监听端口的方法

zookeeper 3.5.0 修改管理控制台 jetty 的监听端口是通过参数-Dzookeeper.admin.serverPort=8088来实现的 而不是-Djetty.port=8088,源代码为 org.apache.zookeeper.server.admin.JettyAdminServer 中的64-67行 public JettyAdminServer() throws AdminServerException {        this(Integer.getInteger

消费滚动滴log日志文件(flume监听,kafka消费,zookeeper协同)

第一步:数据源 手写程序实现自动生成如下格式的日志文件: 15837312345,13737312345,2017-01-09 08:09:10,0360 打包放到服务器,使用如下命令执行,模拟持续不断的日志文件: java -cp ct_producter-1.0-SNAPSHOT.jar producter.ProductLog ./awen.tsv 第二步:监听log.tsv日志 使用Flume监控滚动的awen.tsv日志,编写flume # Name the components on

Zookeeper 事件监听 - 史上最详解读

目录 写在前面 1.1. Curator 事件监听 1.1.1. Watcher 标准的事件处理器 1.1.2. NodeCache 节点缓存的监听 1.1.3. PathChildrenCache 子节点监听 1.1.4. Tree Cache 节点树缓存 写在最后 疯狂创客圈 亿级流量 高并发IM 实战 系列 疯狂创客圈 Java 分布式聊天室[ 亿级流量]实战系列之 -25[ 博客园 总入口 ] 写在前面 ? 大家好,我是作者尼恩.目前和几个小伙伴一起,组织了一个高并发的实战社群[疯狂创客