zookeeper_04:curator

  • 定义

Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。

<dependency>
  <groupId>org.apache.curator</groupId>
  <artifactId>curator-recipes</artifactId>
  <version>2.8.0</version>
</dependency>

  • checkexists

package com.jike.testcurator;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.data.Stat;

public class checkexists {

public static void main(String[] args) throws Exception {

ExecutorService es = Executors.newFixedThreadPool(5);

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();

client.checkExists().inBackground(new BackgroundCallback() {

public void processResult(CuratorFramework arg0, CuratorEvent arg1)
throws Exception {
Stat stat = arg1.getStat();
System.out.println(stat);
System.out.println(arg1.getContext());
}
},"123",es).forPath("/jike");

Thread.sleep(Integer.MAX_VALUE);
}

}

  • CreateNode

package com.jike.testcurator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;

public class CreateNode {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();
String path = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/jike/1","123".getBytes());
System.out.println(path);
Thread.sleep(Integer.MAX_VALUE);

}

}

  • CreateNodeAuth

package com.jike.testcurator;

import java.util.ArrayList;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Perms;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;

public class CreateNodeAuth {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
ACL aclIp = new ACL(Perms.READ,new Id("ip","192.168.1.105"));
ACL aclDigest = new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("jike:123456")));
ArrayList<ACL> acls = new ArrayList<ACL>();
acls.add(aclDigest);
acls.add(aclIp);
String path = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls)
.forPath("/jike/3","123".getBytes());
System.out.println(path);
Thread.sleep(Integer.MAX_VALUE);

}

}

  • CreateSession

package com.jike.testcurator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;

public class CreateSession {

public static void main(String[] args) throws InterruptedException {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
Thread.sleep(Integer.MAX_VALUE);
}
}

  • DelNode

package com.jike.testcurator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;

public class DelNode {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/jike20");
Thread.sleep(Integer.MAX_VALUE);
}

}

  • GetChildren

package com.jike.testcurator;

import java.util.List;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;

public class GetChildren {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
List<String> cList = client.getChildren().forPath("/jike20");
System.out.println(cList.toString());
}
}

  • GetData

package com.jike.testcurator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.data.Stat;

public class GetData {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
// CuratorFramework client = CuratorFrameworkFactory
// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
Stat stat = new Stat();
byte[] ret = client.getData().storingStatIn(stat).forPath("/jike");
System.out.println(new String(ret));
System.out.println(stat);
}

}

  • GetDataAuth

package com.jike.testcurator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.data.Stat;

public class GetDataAuth {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.authorization("digest", "jike:123456".getBytes())
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
Stat stat = new Stat();
byte[] ret = client.getData().storingStatIn(stat).forPath("/jike/3");
System.out.println(new String(ret));
System.out.println(stat);
}

}

  • NodeChildrenListener

package com.jike.testcurator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryUntilElapsed;

public class NodeChildrenListener {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
final PathChildrenCache cache = new PathChildrenCache(client,"/jike",true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED:"+event.getData());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED:"+event.getData());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED:"+event.getData());
break;
default:
break;
}
}
});
cache.close();
Thread.sleep(Integer.MAX_VALUE);

}

}

  • NodeListener

package com.jike.testcurator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.RetryUntilElapsed;

public class NodeListener {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
final NodeCache cache = new NodeCache(client,"/jike");
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {

public void nodeChanged() throws Exception {
// TODO Auto-generated method stub
byte[] ret = cache.getCurrentData().getData();
System.out.println("new data:"+new String(ret));
}
});
cache.close();
Thread.sleep(Integer.MAX_VALUE);

}

}

  • UpdateData

package com.jike.testcurator;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryUntilElapsed;
import org.apache.zookeeper.data.Stat;

public class UpdateData {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();
client.start();
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/jike");
client.setData().withVersion(stat.getVersion()).forPath("/jike", "123".getBytes());
}
}

时间: 2025-01-16 02:42:35

zookeeper_04:curator的相关文章

跟着实例学习ZooKeeper的用法: Curator扩展库

还记得Curator提供哪几个组件吗? 我们不妨回顾一下: Recipes Framework Utilities Client Errors Extensions 前面的例子其实前五个组件都涉及到了, 比如Utilities例子的TestServer, Client里的CuratorZookeeperClient, Errors里的ConnectionStateListener等. 还有最后一个组件我们还没有介绍,那就是Curator扩展组件. Recipes组件包含了丰富的Curator应用

一、Curator使用:如何开始使用及api介绍(创建会话以及增删查改)

前言 记录下ZK客户端的使用学习,初步想法是从几个方面来记录 如何开始使用及api介绍(创建会话以及增删查改) 异步调用 事件 Master选举 分布式锁.计数器.Barrier 版本说明 zk版本: curator版本: <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes --> <dependency> <groupId>org.apache.curator</g

03.Curator深入使用

1.Apache Curator简介 Curator提供了一套Java类库,可以更容易的使用ZooKeeper.ZooKeeper本身提供了Java Client的访问类,但是API太底层,不宜使用,易出错.Curator提供了三个组件.Curator client用来替代ZOoKeeper提供的类,它封装了底层的管理并提供了一些有用的工具.Curator framework提供了高级的API来简化ZooKeeper的使用.它增加了很多基于ZooKeeper的特性,帮助管理ZooKeeper的连

zookeeper开源客户端curator

zookeeper的原生api相对来说比较繁琐,比如:对节点添加监听事件,当监听触发后,我们需要再次手动添加监听,否则监听只生效一次:再比如,断线重连也需要我们手动代码来判断处理等等.对于curator的介绍,从网上百度了一段:Curator是Netflix开源的一套zookeeper客户端框架,用它来操作zookeeper更加方便,按Curator官方所比喻的,guava to JAVA,curator to zookeeper,Curator采用了fluent风格的代码,非常简洁. ----

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源代码实现

假设Spark的部署方式选择Standalone.一个採用Master/Slaves的典型架构.那么Master是有SPOF(单点故障,Single Point of Failure).Spark能够选用ZooKeeper来实现HA. ZooKeeper提供了一个Leader Election机制,利用这个机制能够保证尽管集群存在多个Master可是唯独一个是Active的,其它的都是Standby,当Active的Master出现问题时.另外的一个Standby Master会被选举出来.因为

Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现

如果Spark的部署方式选择Standalone,一个采用Master/Slaves的典型架构,那么Master是有SPOF(单点故障,Single Point of Failure).Spark可以选用ZooKeeper来实现HA. ZooKeeper提供了一个Leader Election机制,利用这个机制可以保证虽然集群存在多个Master但是只有一个是Active的,其他的都是Standby,当Active的Master出现故障时,另外的一个Standby Master会被选举出来.由于

15. 使用Apache Curator管理ZooKeeper

Apache ZooKeeper是为了帮助解决复杂问题的软件工具,它可以帮助用户从复杂的实现中解救出来. 然而,ZooKeeper只暴露了原语,这取决于用户如何使用这些原语来解决应用程序中的协调问题. 社区已经在ZooKeeper数据模型及其API之上开发了高级框架. Apache Curator是一个高级的包装类库和框架,使得ZooKeeper非常简单易用. Tips Curator最初由Netflix开发,现在是一个Apache项目. 项目页面位于http://curator.apache.

Apache Curator操作zookeeper的API使用

curator简介与客户端之间的异同点 常用的zookeeper java客户端: zookeeper原生Java API zkclient Apache curator ZooKeeper原生Java API的不足之处: 在连接zk超时的时候,不支持自动重连,需要手动操作 Watch注册一次就会失效,需要反复注册 不支持递归创建节点 Apache curator: Apache 的开源项目 解决Watch注册一次就会失效的问题 提供的 API 更加简单易用 提供更多解决方案并且实现简单,例如:

Elasticsearch 集群管理工具curator 接口模式使用介绍

安装配置参考文档:http://blog.51cto.com/michaelkang/2333586 curator 接口模式使用介绍 curator的命令行语法如下: curator [--config CONFIG.YML] [--dry-run] ACTION_FILE.YML --config : 之后跟上配置文件 --dry-run :调试参数,测试脚本运行是否正常: ACTION_FILE.YML :action文件中可以包含一连串的action,curator接口集中式的confi