Curator Cache

1.Curator Cache 与原生ZooKeeper Wacher区别

原生的ZooKeeper Wacher是一次性的:一个Wacher一旦触发就会被移出,如果你想要反复使用Wacher,就要在Wacher被移除后重新注册,使用起来很麻烦。使用Curator Cache 可以反复使用Wacher了。

2.Curator Cache 和Curator Wacher区别

2.相关类

Curator Cache主要提供了一下三组类,分别用于实现对节点的监听,子节点的监听和二者的混合:

1.NodeCache,NodeCacheListener,ChildData,节点创建,节点数据内容变更,不能监听节点删除。

2.PathChildrenCache,PathChildrenCacheListener,PathChildrenCacheEvent监听指定节点的子节点的变更包括添加,删除,子节点数据数据变更这三类。

3.TreeCache,TreeCacheListener,TreeCacheEvent,TreeCacheSelector

3.节点监听

package cn.francis.maven.hello.ZooKeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;

public class NodeCacheDemo {
  public static void main(String[]args) throws Exception{
      TestingServer server=null;
      CuratorFramework client=null;
      NodeCache nodeCache=null;
      String path="/francis/nodecache/a";

      try{
         server=new TestingServer();
         client= CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000,3));
         client.start();

         path=client.create().creatingParentsIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL).forPath(path,"init".getBytes());

         nodeCache=new NodeCache(client,path,false);
         nodeCache.start();
         addListener(nodeCache);

         client.setData().forPath(path,"hello".getBytes());
         Thread.sleep(1000);
         client.delete().deletingChildrenIfNeeded().forPath(path);
         Thread.sleep(Integer.MAX_VALUE);

      }catch(Exception e){
          e.printStackTrace();
      }finally{

          //这里因为是测试,没有加他们。
          //CloseableUtils.closeQuietly(nodeCache);
         /// CloseableUtils.closeQuietly(client);
         // CloseableUtils.closeQuietly(server);
      }
  }

  public static void addListener(NodeCache nodeCache){
      //监听类型:节点是否存在,节点数据内容改变,不监听节点删除。
         nodeCache.getListenable().addListener(new NodeCacheListener(){

            @Override
            public void nodeChanged() throws Exception {
                // TODO Auto-generated method stub
                if(nodeCache.getCurrentData()!=null)
                    System.out.println("path:"+nodeCache.getCurrentData().getPath()+",data:"+new String(nodeCache.getCurrentData().getData()));

            }});
  }
}

在上面的代码中首先创建了一个节点,然后创建用这个节点路径来创建NodeCache,启动NodeCache,添加NodeCacheListener。然后调用setData来修改节点数据。上面的代码输入如下:

path:/francis/nodecache/_c_d5be73ca-592c-4eda-b7c4-c8ec60ef39a8-a,data:hello

子节点监听:

PathChildrenCache的构造函数如下:

public PathChildrenCache(CuratorFramework client,
                         String path,
                         boolean cacheData)
Parameters:
client - the client
path - path to watch
cacheData - if true, node contents are cached in addition to the stat

其中最主要的是cacheData,如果为true,那么当对子节点调用setData时,PathChildrenCache会受到这个CHILD_UPDATED事件。

下面看一下demo:

package cn.francis.maven.hello.ZooKeeper;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.mapred.join.Parser.TType;
import org.apache.zookeeper.CreateMode;

public class NodeCacheDemo {
  public static void main(String[]args) throws Exception{
      TestingServer server=null;
      CuratorFramework client=null;
      NodeCache nodeCache=null;
      String path="/francis/nodecache/b";

      try{
         server=new TestingServer();
         client= CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(1000,3));
         client.start();

         //这里将第三个参数cacheData设置为true,这样当对子节点调用setData时,会受到CHILDREN_UPDATE通知。
         PathChildrenCache childrenCache=new PathChildrenCache(client,path,true);

         childrenCache.start(StartMode.POST_INITIALIZED_EVENT);

         childrenCache.getListenable().addListener(new PathChildrenCacheListener(){

            @Override
            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
                // TODO Auto-generated method stub
               if(event.getType()==Type.INITIALIZED){
                   System.out.println("create"+event.getData().getPath());
               }else if(event.getType()==Type.CHILD_ADDED){
                 System.out.println("create"+event.getData().getPath());
               }else if(event.getType()==Type.CHILD_REMOVED){
                   System.out.println("remove:"+event.getData().getPath());
               }else if(event.getType()==Type.CHILD_UPDATED){
                  //System.out.println("update:"+event.getData().getPath());
                   System.out.println("update:"+new String(event.getData().getData()));
               }

            }});

         //创建父节点
         System.out.println(client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes()));
         Thread.sleep(1000);

         //创建子节点
         String childPath1=ZKPaths.makePath(path, "a");
         childPath1=client.create().withMode(CreateMode.PERSISTENT).forPath(childPath1,"1".getBytes());
         Thread.sleep(1000);

         //对子节点赋值
         client.setData().forPath(childPath1,"aaa".getBytes());
         Thread.sleep(1000);

         //删除子节点
         client.delete().forPath(childPath1);
         client.delete().deletingChildrenIfNeeded().forPath("/francis");

         Thread.sleep(2000);

      }catch(Exception e){
          e.printStackTrace();
      }finally{
          CloseableUtils.closeQuietly(nodeCache);
          CloseableUtils.closeQuietly(client);
          CloseableUtils.closeQuietly(server);
      }
  }

3.TreeNodeCache

TreeNodeCache将NodeCache和PathChildrenCache功能结合到一起了。他不仅可以对子节点和父节点同时进行监听。如下:

package cn.francis.maven.hello.ZooKeeper;

import java.util.Map;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.mapred.join.Parser.TType;
import org.apache.zookeeper.CreateMode;

public class NodeCacheDemo {
  public static void main(String[]args) throws Exception{
      TestingServer server=null;
      CuratorFramework client=null;
      NodeCache nodeCache=null;
      String path="/francis/nodecache/b";

      try{
         server=new TestingServer();
         client= CuratorFrameworkFactory.newClient(server.getConnectString(),new ExponentialBackoffRetry(1000,3));
         client.start();

         TreeCache treeNodeCache=new TreeCache(client,path);

         treeNodeCache.start();

         treeNodeCache.getListenable().addListener(new TreeCacheListener(){

            @Override
            public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
                // TODO Auto-generated method stub
                switch(event.getType()){
                  case NODE_ADDED:
                      System.out.println("added:"+event.getData().getPath());
                      break;
                  case NODE_UPDATED:
                      System.out.println("updated:"+event.getData().getPath());
                      break;
                  case NODE_REMOVED:
                      System.out.println("removed:"+event.getData().getPath());
                      break;
                  default:
                      System.out.println("other:"+event.getType());
                }
           }

         });

         //创建父节点
         client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path,"init".getBytes());
         Thread.sleep(1000);

         //创建子节点
         String childPath1=ZKPaths.makePath(path, "a");
         childPath1=client.create().withMode(CreateMode.PERSISTENT).forPath(childPath1,"1".getBytes());
         Thread.sleep(1000);

         //对子节点赋值
         client.setData().forPath(childPath1,"aaa".getBytes());
         Thread.sleep(1000);

         //对子节点赋值
         client.setData().forPath(path,"aaa".getBytes());
         Thread.sleep(1000);

         //删除子节点
         client.delete().forPath(childPath1);
         client.delete().deletingChildrenIfNeeded().forPath("/francis");

         Thread.sleep(2000);

      }catch(Exception e){
          e.printStackTrace();
      }finally{

          //这里因为是测试,没有加他们。
          CloseableUtils.closeQuietly(nodeCache);
          CloseableUtils.closeQuietly(client);
          CloseableUtils.closeQuietly(server);
      }
  }
}

输出如下:

other:INITIALIZED
added:/francis/nodecache/b
added:/francis/nodecache/b/a
updated:/francis/nodecache/b/a
updated:/francis/nodecache/b
removed:/francis/nodecache/b/a
removed:/francis/nodecache/b

时间: 2024-10-25 17:34:02

Curator Cache的相关文章

Apache Curator Node Cache Watcher

只能监听某一路径本身的add,delete,update 1.run NodeListener 2.run NLTest package com.collonn.javaUtilMvn.zookeeper.curator.NodeCache; public class NLTest { public static void main(String[] args) throws Exception { NLClientCreate.main(null); Thread.sleep(1000 * 2

Apache Curator Path Cache Watcher

可以监控某一路径的直接子结点(一级子结点)变化,add,update,delete. 利用此特性可以很方便的监控集群中的所有结点,当然也就很方便的可以实现简单的key.hashCode()%serverCount式的分布式计算,还可以实现简单的定制规则的负载均衡. 1.run ChildrenListener 2.run CLTest package com.collonn.javaUtilMvn.zookeeper.curator.PathCache; public class CLTest

Apache Curator Tree Cache Watcher

可以监控某一路径下的子结点(所有子结节,不管有多少层子结点)变化. 比NodeCache方便的是,可以监听一群结点,而不用一个节点一个节点的去设置监听 1.run TreeListener 2.run TLTest package com.collonn.javaUtilMvn.zookeeper.curator.TreeCache; public class TLTest { public static void main(String[] args) throws Exception { T

03.Curator深入使用

1.Apache Curator简介 Curator提供了一套Java类库,可以更容易的使用ZooKeeper.ZooKeeper本身提供了Java Client的访问类,但是API太底层,不宜使用,易出错.Curator提供了三个组件.Curator client用来替代ZOoKeeper提供的类,它封装了底层的管理并提供了一些有用的工具.Curator framework提供了高级的API来简化ZooKeeper的使用.它增加了很多基于ZooKeeper的特性,帮助管理ZooKeeper的连

Apache Curator入门实战

版权声明:本文为博主原创文章,未经博主允许不得转载. 目录(?)[+] Apache Curator入门实战 Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量. 1.Zookeeper安装部署 Zookeeper的部署很简单,如果已经有Java运行环境的话,下载tarball解压后即可运行. [root@vm Temp]$ wget http://mirror.bi

ZooKeeper(3.4.5) 使用Curator监听事件

ZooKeeper原生的API支持通过注册Watcher来进行事件监听,但是Watcher通知是一次性的,因此开发过程中需要反复注册Watcher,比较繁琐.Curator引入了Cache来监听ZooKeeper服务端的事件.Cache对ZooKeeper事件监听进行了封装,能够自动处理反复注册监听,简化了ZooKeeper原生API繁琐的开发过程. 简单的示例: package com.huey.dream.demo; import java.util.concurrent.ExecutorS

Curator入门教程1

 简介 Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. 1.zookekeeper基本功能      连接zookeeper,对节点进行crud.回调函数.判断节点是否存在 package bjsxt.curator.base; import java.util.List; imp

12.Curator扩展库

Recipes组件包含了丰富的Curator应用的组件.但是这些并不是ZooKeeper Recipe的全部.大量的分布式应用已经抽象出了许许多多的的Recipe,其中有些还是可以通过Curator来实现. 如果不断都将这些Recipe都增加到Recipes中,Recipes会变得越来越大.为了避免这种状况,Curator把一些其它的Recipe放在单独的包中,命名方式就是curator-x-,比如curator-x-discovery, curator-x-rpc.本文就是主要介绍curato

Zookeeper开源客户端框架Curator简介

Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手. 看完官方的文档之后, 发现Curator主要解决了三类