先说个小插曲,前几天有个网站转载我的文章没有署名作者,我有点不开心就给他们留言了,然后今天一看他们把文章删了。其实我的意思并不是你允许转载,我想表达的是我的付出需要被尊重。也不知道是谁的错~
==================================
官网上的入门教程非常简单,如下:
学习Zookeeper
使用Curator的用户默认是了解Zookeeper的,Zookeeper的入门在这里:http: //zookeeper.apache.org/doc/trunk/zookeeperStarted.html
使用Curator
Curator的jar包可以从Maven中央仓库获得。各种工具罗列在主页上 main page。Maven,Gradle,Ant等用户可以轻松地将Curator包含进其构建脚本中。
大多数用户希望使用Curator的预制recipes(基于framework,提供高级特性),所以,curator-recipes是个正确的选择。如果您只想使用Zookeeper添加连接管理和重试策略的封装好的工具,那么使用curator-framework。
获取一个连接
Curator使用流式风格。如果你之前没有使用过这个,可能看起来很奇怪,因此建议您事先熟悉一下风格。ps:其实就是链式风格(Demo demo = new DemoBuilder().first().second().last().build();)
Curator连接的实例(CuratorFramework)来自于CuratorFrameworkFactory。每一个你连接的Zookeeper集群只需要一个CuratorFramework实例:
CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy)
这将会使用默认值去连接一个Zookeeper集群。唯一需要指定的是 重试策略。大多数情况下,您应该使用:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3) CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy); client.start();
客户端必须启动(不再使用必须关闭)
直接调用Zookeeper
一旦你有一个CuratorFramework实例,你可以直接调用ZooKeeper,就像使用ZooKeeper中提供的原始Zookeeper对象一样。例如:
client.create().forPath("/my/path", myData)
这里的好处是:由CuratorFramework管理Zookeeper连接,并且当出现连接问题时会重试。
Recipes(高级特性)
分布式锁
InterProcessMutex lock = new InterProcessMutex(client, lockPath); if ( lock.acquire(maxWait, waitUnit) ) { try { // do some work inside of the critical section here } finally { lock.release(); } }
Leader选举
LeaderSelectorListener listener = new LeaderSelectorListenerAdapter() { public void takeLeadership(CuratorFramework client) throws Exception { // this callback will get called when you are the leader // do whatever leader work you need to and only exit // this method when you want to relinquish leadership } } LeaderSelector selector = new LeaderSelector(client, path, listener); selector.autoRequeue(); // not required, but this is behavior that you will probably expect selector.start();
更多的特性翻译接下来的文章会有。
一个小例子
导入的jar包:
curator-recipes这个包没用到,ZkClient那个包是另外一个客户端,这里也用不到,除了这两个其他都是要导入的
package zookeeper.curator; import java.util.List; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.data.Stat; public class CuratorBase { private static String connectString = "192.168.127.129:2181,192.168.127.130:2181,192.168.127.131:2181"; public static void main(String[] args) throws Exception { // 重试策略,初试时间1s,重试3次 RetryPolicy policy = new ExponentialBackoffRetry(1000, 3); CuratorFramework curator = CuratorFrameworkFactory.newClient(connectString, policy); curator.start(); //获取状态 System.out.println(curator.getState()); System.out.println("============Create============="); //创建节点 String cPath1 = curator.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent event) throws Exception { System.out.println(">>>>>>>>>>>>>>>>>>>>>>>"); System.out.println("Code:" + event.getResultCode()); System.out.println("Name:" + event.getName()); System.out.println("Path:" + event.getPath()); System.out.println("Type:" + event.getType()); System.out.println("CurrentThread:" + Thread.currentThread().getName()); System.out.println("<<<<<<<<<<<<<<<<<<<<<<<"); } }).forPath("/root/rt", "123".getBytes()); System.out.println("MainThread:" + Thread.currentThread().getName()); Thread.sleep(2000); //暂停2s,为了等异步执行完,否则下面会报错:KeeperErrorCode = NoNode for /root/rr System.out.println(cPath1); String cPath2 = curator.create().withMode(CreateMode.PERSISTENT).forPath("/root/rr", "456".getBytes()); System.out.println(cPath2); String cPath3 = curator.create().withMode(CreateMode.PERSISTENT).forPath("/root/re", "789".getBytes()); System.out.println(cPath3); System.out.println("===========getData=============="); //获取节点数值 byte[] data = curator.getData().forPath("/root/rt"); System.out.println("数值:" + new String(data)); System.out.println("============setData============="); //修改节点数据 Stat stat = curator.setData().forPath("/root/rt", "abc".getBytes()); System.out.println(stat); System.out.println("============getChildren============="); //获取子节点 List<String> childPath = curator.getChildren().forPath("/root"); for (String path : childPath) { System.out.println(path + ":" + new String(curator.getData().forPath("/root/" + path))); } System.out.println("============Detele============="); //删除节点 curator.delete().forPath("/root/rt"); System.out.println("是否存在:" + curator.checkExists().forPath("/root/rt")); curator.delete().deletingChildrenIfNeeded().forPath("/root"); System.out.println("是否存在:" + curator.checkExists().forPath("/root")); curator.close(); //关闭 } }
log4j2.xml
<?xml version="1.0" encoding="UTF-8"?> <Configuration monitorInterval="1800"> <Filter type="ThresholdFilter" level="trace"/> <Appenders> <Console name="console" target="SYSTEM_OUT"> <!--控制台只输出level及以上级别的信息(onMatch),其他的直接拒绝(onMismatch)--> <ThresholdFilter level="trace" onMatch="ACCEPT" onMismatch="DENY"/> <PatternLayout pattern="%d %-5p [%t] %C{2} (%F:%L) - %m%n"/> </Console> </Appenders> <Loggers> <Root level="WARN"> <!-- TRACE < DEBUG < INFO < WARN < ERROR < FATAL --> <AppenderRef ref="console"/> </Root> </Loggers> </Configuration>
结果:
STARTED ============Create============= MainThread:main >>>>>>>>>>>>>>>>>>>>>>> Code:0 Name:/root/rt Path:/root/rt Type:CREATE CurrentThread:main-EventThread <<<<<<<<<<<<<<<<<<<<<<< null /root/rr /root/re ===========getData============== 数值:123 ============setData============= 51539607575,51539607578,1502067208571,1502067210604,1,0,0,0,3,0,51539607575 ============getChildren============= rr:456 rt:abc re:789 ============Detele============= 是否存在:null 是否存在:null
关于代码也没啥可解释的,顾名思义。
比如creatingParentsIfNeeded(),就是如果需要的话就创建父节点,这个方法的好处是可以递归创建节点。
inBackground(new BackgroundCallback(){})这个就是异步创建节点啦,不阻塞线程。
更多内容以后再说。