zookeeper Watcher API 说明

  Watcher 在 ZooKeeper 是一个核心功能,Watcher 可以监控目录节点的数据变化以及子目录的变化,一旦这些状态发生变化,服务器就会通知所有设置在这个目录节点上的 Watcher,从而每个客户端都很快知道它所关注的目录节点的状态发生变化,而做出相应的反应.

可以设置观察的操作:exists,getChildren,getData

可以触发观察的操作:create,delete,setData

znode以某种方式发生变化时,“观察”(watch)机制可以让客户端得到通知.可以针对ZooKeeper服务的“操作”来设置观察,该服务的其他 操作可以触发观察.

比如,客户端可以对某个客户端调用exists操作,同时在它上面设置一个观察,如果此时这个znode不存在,则exists返回 false,如果一段时间之后,这个znode被其他客户端创建,则这个观察会被触发,之前的那个客户端就会得到通知.

说明: zookeeper客户端对server的操作都是不可回退的。
意思是说,zk的客户端每次和server进行通信的时候,会记住server上最新的zxid。如果某个时刻,客户端和server断开了连接,那么等到下次重新连接到集群中的机器上时,会检查当前连接上的那个server是否和client有相同的zxid,或者已经是更新的zxid了。一旦客户端发现server的zxid比自己小,那么客户端会断开和这个server的连接,并且重新连接集群中的其它server.

1、链接Zookeeper服务器

/**
     * <p>连接Zookeeper</p>
     * <pre>
     *     [关于connectString服务器地址配置]
     *     格式: 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
     *     这个地址配置有多个ip:port之间逗号分隔,底层操作
     *     ConnectStringParser connectStringParser =  new ConnectStringParser(“192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181”);
     *     这个类主要就是解析传入地址列表字符串,将其它保存在一个ArrayList中
     *     ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
     *     接下去,这个地址列表会被进一步封装成StaticHostProvider对象,并且在运行过程中,一直是这个对象来维护整个地址列表。
     *     ZK客户端将所有Server保存在一个List中,然后随机打乱(这个随机过程是一次性的),并且形成一个环,具体使用的时候,从0号位开始一个一个使用。
     *     因此,Server地址能够重复配置,这样能够弥补客户端无法设置Server权重的缺陷,但是也会加大风险。
     *
     *     [客户端和服务端会话说明]
     *     ZooKeeper中,客户端和服务端建立连接后,会话随之建立,生成一个全局唯一的会话ID(Session ID)。
     *     服务器和客户端之间维持的是一个长连接,在SESSION_TIMEOUT时间内,服务器会确定客户端是否正常连接(客户端会定时向服务器发送heart_beat,服务器重置下次SESSION_TIMEOUT时间)。
     *     因此,在正常情况下,Session一直有效,并且ZK集群所有机器上都保存这个Session信息。
     *     在出现网络或其它问题情况下(例如客户端所连接的那台ZK机器挂了,或是其它原因的网络闪断),客户端与当前连接的那台服务器之间连接断了,
     *     这个时候客户端会主动在地址列表(实例化ZK对象的时候传入构造方法的那个参数connectString)中选择新的地址进行连接。
     *
     *     [会话时间]
     *     客户端并不是可以随意设置这个会话超时时间,在ZK服务器端对会话超时时间是有限制的,主要是minSessionTimeout和maxSessionTimeout这两个参数设置的。
     *     如果客户端设置的超时时间不在这个范围,那么会被强制设置为最大或最小时间。 默认的Session超时时间是在2 * tickTime ~ 20 * tickTime
     * </pre>
     * @param connectString  Zookeeper服务地址
     * @param sessionTimeout Zookeeper连接超时时间
     */
    public void connectionZookeeper(String connectString, int sessionTimeout){
        this.releaseConnection();
        try {
            // ZK客户端允许我们将ZK服务器的所有地址都配置在这里
            zk = new ZooKeeper(connectString, sessionTimeout, this );
            // 使用CountDownLatch.await()的线程(当前线程)阻塞直到所有其它拥有CountDownLatch的线程执行完毕(countDown()结果为0)
            connectedSemaphore.await();
        } catch ( InterruptedException e ) {
            LOG.error("连接创建失败,发生 InterruptedException , e " + e.getMessage(), e);
        } catch ( IOException e ) {
            LOG.error( "连接创建失败,发生 IOException , e " + e.getMessage(), e );
        }
    }

2、创建节点

    /**
     * <p>创建zNode节点, String create(path<节点路径>, data[]<节点内容>, List(ACL访问控制列表), CreateMode<zNode创建类型>) </p><br/>
     * <pre>
     *     节点创建类型(CreateMode)
     *     1、PERSISTENT:持久化节点
     *     2、PERSISTENT_SEQUENTIAL:顺序自动编号持久化节点,这种节点会根据当前已存在的节点数自动加 1
     *     3、EPHEMERAL:临时节点客户端,session超时这类节点就会被自动删除
     *     4、EPHEMERAL_SEQUENTIAL:临时自动编号节点
     * </pre>
     * @param path zNode节点路径
     * @param data zNode数据内容
     * @return 创建成功返回true, 反之返回false.
     */
    public boolean createPath( String path, String data ) {
        try {
            String zkPath =  this.zk.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            LOG.info( "节点创建成功, Path: " + zkPath + ", content: " + data );
            return true;
        } catch ( KeeperException e ) {
            LOG.error( "节点创建失败, 发生KeeperException! path: " + path + ", data:" + data
                    + ", errMsg:" + e.getMessage(), e );
        } catch ( InterruptedException e ) {
            LOG.error( "节点创建失败, 发生 InterruptedException! path: " + path + ", data:" + data
                    + ", errMsg:" + e.getMessage(), e );
        }
        return false;
    }

3、删除节点

    /**
     * <p>删除一个zMode节点, void delete(path<节点路径>, stat<数据版本号>)</p><br/>
     * <pre>
     *     说明
     *     1、版本号不一致,无法进行数据删除操作.
     *     2、如果版本号与znode的版本号不一致,将无法删除,是一种乐观加锁机制;如果将版本号设置为-1,不会去检测版本,直接删除.
     * </pre>
     * @param path zNode节点路径
     * @return 删除成功返回true,反之返回false.
     */
    public boolean deletePath( String path ){
        try {
            this.zk.delete(path,-1);
            LOG.info( "节点删除成功, Path: " + path);
            return true;
        } catch ( KeeperException e ) {
            LOG.error( "节点删除失败, 发生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch ( InterruptedException e ) {
            LOG.error( "节点删除失败, 发生 InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return false;
    }

4、节点赋值/更新节点

    /**
     * <p>更新指定节点数据内容, Stat setData(path<节点路径>, data[]<节点内容>, stat<数据版本号>)</p>
     * <pre>
     *     设置某个znode上的数据时如果为-1,跳过版本检查
     * </pre>
     * @param path zNode节点路径
     * @param data zNode数据内容
     * @return 更新成功返回true,返回返回false
     */
    public boolean writeData( String path, String data){
        try {
            Stat stat = this.zk.setData(path, data.getBytes(), -1);
            LOG.info( "更新数据成功, path:" + path + ", stat: " + stat );
            return true;
        } catch (KeeperException e) {
            LOG.error( "更新数据失败, 发生KeeperException! path: " + path + ", data:" + data
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
            LOG.error( "更新数据失败, 发生InterruptedException! path: " + path + ", data:" + data
                    + ", errMsg:" + e.getMessage(), e );
        }
        return false;
    }

5、读取节点值

    /**
     * <p>读取指定节点数据内容,byte[] getData(path<节点路径>, watcher<监视器>, stat<数据版本号>)</p>
     * @param path zNode节点路径
     * @return 节点存储的值,有值返回,无值返回null
     */
    public String readData( String path ){
        String data = null;
        try {
            data = new String( this.zk.getData( path, false, null ) );
            LOG.info( "读取数据成功, path:" + path + ", content:" + data);
        } catch (KeeperException e) {
            LOG.error( "读取数据失败,发生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
            LOG.error( "读取数据失败,发生InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return  data;
    }

6、判断节点是否存在

    /**
     * <p>判断某个zNode节点是否存在, Stat exists(path<节点路径>, watch<并设置是否监控这个目录节点,这里的 watcher 是在创建 ZooKeeper 实例时指定的 watcher>)</p>
     * @param path zNode节点路径
     * @return 存在返回true,反之返回false
     */
    public boolean isExists( String path ){
        try {
            Stat stat = this.zk.exists( path, false );
            return null != stat;
        } catch (KeeperException e) {
            LOG.error( "读取数据失败,发生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
            LOG.error( "读取数据失败,发生InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return false;
    }

7、获取某个节点下的子节点

    /**
     * <p>获取某个节点下的所有子节点,List getChildren(path<节点路径>, watcher<监视器>)该方法有多个重载</p>
     * @param path zNode节点路径
     * @return 子节点路径集合 说明,这里返回的值为节点名
     * <pre>
     *     eg.
     *     /node
     *     /node/child1
     *     /node/child2
     *     getChild( "node" )户的集合中的值为["child1","child2"]
     * </pre>
     *
     *
     *
     * @throws KeeperException
     * @throws InterruptedException
     */
    public List<String> getChild( String path ){
        try{
            List<String> list=this.zk.getChildren( path, false );
            if(list.isEmpty()){
                LOG.info( "中没有节点" + path );
            }
            return list;
        }catch (KeeperException e) {
            LOG.error( "读取子节点数据失败,发生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
            LOG.error( "读取子节点数据失败,发生InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return null;
    }

8、释放链接

  /**
     * 关闭ZK连接
     */
    public void releaseConnection() {
        if ( null != zk ) {
            try {
                this.zk.close();
            } catch ( InterruptedException e ) {
                LOG.error("release connection error ," + e.getMessage() ,e);
            }
        }
    }

转载请注明出处:[http://www.cnblogs.com/dennisit/p/4340688.html]

时间: 2024-10-13 05:30:05

zookeeper Watcher API 说明的相关文章

(原) 2.1 Zookeeper原生API使用

本文为原创文章,未经允许不得转载 Zookeeper原生API使用 1.jar包引入,演示版本为3.4.6,非maven项目,可以下载jar包导入到项目中 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> 2.

ZooKeeper JAVA API 之环境准备和创建会话

Zookeeper是一个开放源代码的分布式协调服务,由雅虎创建,是Google Chubby的开源实现.Zookeeper的设计目标是将那些复杂且容易出错的分布式一致性服务封装起来,构成一个高效可靠的元语集,并以一系列简单易用的接口提供给用户使用. 单机模式部署与运行(Windows) 确保已经安装了JAVA 1.6及其以上版本的JDK 下载Zookeeper    http://zookeeper.apache.org/releases.html  目前稳定版本即stable版本为3.4.6.

Apache ZooKeeper Watcher 机制源码解释

分布式系统从根本上来说就是不同节点上的进程并发执行,并且相互之间对进程的行为进行协调处理的过程.不同节点上的进程互相协调行为的过程叫做分布式同步.许多分布式系统需要一个进程作为任务的协调者,执行一些其他进程并不执行的特殊的操作,一般情况下哪个进程担当任务的协调者都无所谓,但是必须有一个进程作为协调者,自动选举出一个协调者的过程就是分布式选举.ZooKeeper 正是为了解决这一系列问题而生的.上一篇我们介绍了 ZooKeeper 服务启动原理和源代码剖析,这一讲我们来谈谈 Watcher 机制,

Zookeeper客户端API之创建会话(六)

Zookeeper对外提供了一套Java的客户端API.本篇博客主要讲一下创建会话. 创建项目 首选,创建一个基于maven管理的简单java工程.在pom文件中引入zookeeper. <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.9</version> </d

Zookeeper的api的简单使用(转载)

转载自: http://www.cnblogs.com/sunddenly/p/4031881.html 1.API 2.API 示例 ZooKeeper中的组成员关系 理解ZooKeeper的一种方法就是将其看作一个具有高可用性的文件系统.但这个文件系统中没有文件和目录,而是统一使用"节点"(node)的概念,称为znode.znode既可以作为保存数据的容器(如同文件),也可以作为保存其他znode的容器(如同目录).所有的znode构成一个层次化的命名空间.一种自然的建立组成员列

Zookeeper C API 指南三(回调函数)(转)

2013-02-21 12:54 by Haippy, 9237 阅读, 0 评论, 收藏, 编辑 接上一篇<Zookeeper C API 指南二(监视(Wathes), 基本常量和结构体介绍)>,本文重点介绍 Zookeeper C API 中的各种回调函数. Zookeeper C API 中各种回调函数简介 在具体介绍 Zookeeper C API 之前,首先介绍一下 Zookeeper C API 中的各种回调函数的原型: 监视函数(watch function)原型 typede

Zookeeper C API 指南四(C API 概览)(转)

上一节<Zookeeper C API 指南三(回调函数)>重点讲了 Zookeeper C API 中各种回调函数的原型,本节将切入正题,正式讲解 Zookeeper C API.相信大家读完本文后应该对 Zookeeper C API 的使用有一个比较清晰的认识. Zookeeper C API 概览 Zookeeper C API 很规范,接口很容易记忆,大部分接口均以 zoo_ 开头,只有少量接口以 zookeeper_ 开头,所有的 API 汇总如下: void zoo_create

Apache Curator操作zookeeper的API使用

curator简介与客户端之间的异同点 常用的zookeeper java客户端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Java API的不足之处: 在连接zk超时的时候,不支持自动重连,需要手动操作 Watch注册一次就会失效,需要反复注册 不支持递归创建节点 Apache curator: Apache 的开源项目 解决Watch注册一次就会失效的问题 提供的 API 更加简单易用 提供更多解决方案并且实现简单,例如:

开始使?ZooKeeper的API

在之前的章节中,我们使用zkCli工具介绍了ZooKeeper的基本操作.从本章开始,我们将会看到在应用中如何通过API来进行操作.首先介绍一下如何使用ZooKeeper的API进行开发,展示如何创建会话,实现监视点(watcher).我们还是从主-从模式例子开始进行编码 1.1建立ZooKeeper会话 ZooKeeper的API围绕ZooKeeper的句柄(handle)来构建,每个API调用都需要传递这个句柄.这个句柄代表与ZooKeeper之间的一个会话.在图3-1中,与ZooKeepe