一、简介
Curator是Netflix公司开源的一套zookeeper客户端框架。
Curator包含了几个包:
curator-framework:对zookeeper的底层api的一些封装
curator-client:提供一些客户端的操作,例如重试策略等
curator-recipes:封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式Barrier等。
我们需要根据ZK的版本来选择对应的curator版本,否则会出现兼容性问题
二、环境
jdk 1.8
zk 3.4.12
curator-recipes 2.8.0
三、常用api介绍
1)客户端创建
CuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS, 5000,3000,new RetryNTimes(10, 5000)); 1、ZK_ADDRESS 连接zk的地址 格式host1:port1,host2:port2,。。。 2、sessionTimeoutMs 会话超时时间,单位是毫秒,可不填测此参数,默认值为60000ms 3、connectionTimeoutMs 4、retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口。
2)客户端创建
client.start()
3)创建数据节点
四种节点
PERSISTENT:持久化
PERSISTENT_SEQUENTIAL:持久化并且带序列号
EPHEMERAL:临时
EPHEMERAL_SEQUENTIAL:临时并且带序列号
1、默认持久化节点
client.create().forPath("/data1");
2、创建持久化节点并赋值
client.create().forPath("/data2", "this is data2".getBytes());
3、创建临时空节点
client.create().withMode(CreateMode.EPHEMERAL).forPath("/data3");
4、创建临时节点并赋值
client.create().withMode(CreateMode.EPHEMERAL).forPath("/data4", "this is data4".getBytes());
5、创建临时有序节点并赋值并递归创建父节点
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPH_SEQ).forPath("/patent/data", "children".getBytes());
4)删除数据节点
流式风格,拼接顺序可以调整。
1、只能删除叶子节点,否则抛出异常
client.delete().forPath("/data1");
2、删除一个节点并递归删除其所有的子节点
client.delete().deletingChildrenIfNeeded().forPath("/patent/data");
3、保证强制删除一个节点,只要客户端会话有效就会持续进行删除直到删除成功。
client.delete().guaranteed().forPath("/data2");
5)查询数据节点
1、读节点的数据内容,返回byte数组
byte[] data=client.getData().forPath("/data2");
2、读取一个节点的数据内容,同时获取到该节点的stat
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("data4");
3、获取一个路径下所有的子节点
List<String> childrens = client.getChildren().forPath("/");
4、检查节点是否存在,返回一个stat
stat= client.checkExists().forPath("/data3");
6)数据节点更新
1、更新一个节点的数据内容,返回一个stat
Stat stat = client.setData().forPath("/data1","update data1 data".getBytes());
7)事务
CuratorFramework的实例包含inTransaction()接口方法,调用此方法开启一个ZooKeeper事务. 可以复合create, setData, check, and/or delete 等操作然后调用commit()作为一个原子操作提交。
client.inTransaction()
.and().create().withMode(CreateMode.PERSISTENT).forPath("/data5", "this is data5".getBytes())
.and().setData().forPath("/data5", "update data5".getBytes())
.and().commit();
8)监听watch
Zookeeper原生支持通过注册Watcher来进行事件监听,但是开发者需要反复注册(Watcher只能单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator提供了三种Watcher(Cache)来监听结点的变化。
1、Path Cache
Path Cache用来监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过PathChildrenCacheListener通知。
package com.vi.test; import com.vi.util.CuratorClientUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.zookeeper.CreateMode; import java.util.List; public class CuratorWatchTest { private static final String PATH = "/path/cache"; public static void main(String[] args) { //创建客户端 CuratorFramework client = CuratorClientUtil.getClient(); client.start(); try { PathChildrenCache cache = pathCacheTest(client); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data1", "first data".getBytes()); Thread.sleep(500); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data2", "second data".getBytes()); Thread.sleep(500); //获取所有子节点 List<ChildData> datas =cache.getCurrentData(); for(ChildData childData : datas){ System.out.println("节点路径:"+childData.getPath() + ",节点值:" + new String(childData.getData())); } client.setData().forPath(PATH + "/data1", "update data".getBytes()); Thread.sleep(500); client.delete().deletingChildrenIfNeeded().forPath("/path"); Thread.sleep(500); cache.close(); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static PathChildrenCache pathCacheTest(CuratorFramework client) throws Exception { //第三个参数不为true时,不会缓存data数据 PathChildrenCache cache = new PathChildrenCache(client, PATH, true); /* 三种启动方式 NORMAL:正常初始化。 BUILD_INITIAL_CACHE:在调用start()之前会调用rebuild()。 POST_INITIALIZED_EVENT: 当Cache初始化数据后发送一个 cache.start(PathChildrenCache.StartMode.NORMAL); */ cache.start(); //增加监听事件 cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { System.out.println("事件为:" + event.getType() + ",数据为:" + new String(event.getData().getData())); } }); return cache; } }
2)Node Cache
Node Cache与Path Cache类似,Node Cache只能监听某一个特定的节点。
package com.vi.test; import com.vi.util.CuratorClientUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.zookeeper.CreateMode; public class CuratorNodeCacheTest { private static final String PATH = "/path/cache"; public static void main(String[] args) { //创建客户端 CuratorFramework client = CuratorClientUtil.getClient(); client.start(); try { NodeCache cache = nodeCacheTest(client); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH, "first data".getBytes()); Thread.sleep(500); client.setData().forPath(PATH, "update data".getBytes()); Thread.sleep(500); client.delete().deletingChildrenIfNeeded().forPath("/path"); Thread.sleep(500); cache.close(); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static NodeCache nodeCacheTest(CuratorFramework client) throws Exception { //只能监控一个节点 NodeCache cache = new NodeCache(client, PATH); cache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { ChildData childData = cache.getCurrentData(); if (childData == null) { System.out.println("节点删除!"); } else { System.out.println("节点数据:" + new String(cache.getCurrentData().getData())); } } }); cache.start(); return cache; } }
3)Tree Cache
Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合。
package com.vi.test; import com.vi.util.CuratorClientUtil; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.zookeeper.CreateMode; public class CuratorTreeCacheTest { private static final String PATH = "/path/cache"; public static void main(String[] args) { //创建客户端 CuratorFramework client = CuratorClientUtil.getClient(); client.start(); try { TreeCache cache = treeCacheTest(client); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data1", "first data".getBytes()); Thread.sleep(500); client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(PATH + "/data2", "second data".getBytes()); Thread.sleep(500); client.setData().forPath(PATH , "update path data".getBytes()); Thread.sleep(500); client.setData().forPath(PATH + "/data1", "update data".getBytes()); Thread.sleep(500); client.delete().deletingChildrenIfNeeded().forPath("/path"); Thread.sleep(1000); cache.close(); client.close(); } catch (Exception e) { e.printStackTrace(); } } public static TreeCache treeCacheTest(CuratorFramework client) throws Exception { //只能监控一个节点 TreeCache cache = new TreeCache(client, PATH); cache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { System.out.println("事件为:" + event.getType() + ",节点路径为:" + event.getData().getPath() + ",数据为:" + new String(event.getData().getData())); } }); cache.start(); return cache; } }
四、总结
curator-recipes中有一些高级特性可以使用,在后面的章节会具体介绍。
源码地址: https://github.com/binary-vi/binary.github.io/tree/master/zk-curator
原文地址:https://www.cnblogs.com/vi-2525/p/9014091.html