Zookeeper原生客户端

1.1.1.1. 客户端基本操作

package cn.enjoy.javaapi;

import org.apache.zookeeper.*;

import java.io.IOException;

import java.util.concurrent.CountDownLatch;

public class TestJavaApi implements Watcher {

private static final int SESSION_TIMEOUT = 10000;

private static final String CONNECTION_STRING = "192.168.30.10:2181";

private static final String ZK_PATH = "/leader";

private ZooKeeper zk = null;

private CountDownLatch connectedSemaphore = new CountDownLatch(1);

/**

* 创建ZK连接

*

* @param connectString  ZK服务器地址列表

* @param sessionTimeout Session超时时间

*/

public void createConnection(String connectString, int sessionTimeout) {

this.releaseConnection();

try {

zk = new ZooKeeper(connectString, sessionTimeout, this);

connectedSemaphore.await();

} catch (InterruptedException e) {

System.out.println("连接创建失败,发生 InterruptedException");

e.printStackTrace();

} catch (IOException e) {

System.out.println("连接创建失败,发生 IOException");

e.printStackTrace();

}

}

/**

* 关闭ZK连接

*/

public void releaseConnection() {

if (null != this.zk) {

try {

this.zk.close();

} catch (InterruptedException e) {

// ignore

e.printStackTrace();

}

}

}

/**

* 创建节点

*

* @param path 节点path

* @param data 初始数据内容

* @return

*/

public boolean createPath(String path, String data) {

try {

System.out.println("节点创建成功, Path: "

+ this.zk.create(path, // 节点路径

data.getBytes(), // 节点内容

ZooDefs.Ids.OPEN_ACL_UNSAFE, //节点权限

CreateMode.EPHEMERAL) //节点类型

+ ", content: " + data);

} catch (KeeperException e) {

System.out.println("节点创建失败,发生KeeperException");

e.printStackTrace();

} catch (InterruptedException e) {

System.out.println("节点创建失败,发生 InterruptedException");

e.printStackTrace();

}

return true;

}

/**

* 读取指定节点数据内容

*

* @param path 节点path

* @return

*/

public String readData(String path) {

try {

System.out.println("获取数据成功,path:" + path);

return new String(this.zk.getData(path, false, null));

} catch (KeeperException e) {

System.out.println("读取数据失败,发生KeeperException,path: " + path);

e.printStackTrace();

return "";

} catch (InterruptedException e) {

System.out.println("读取数据失败,发生 InterruptedException,path: " + path);

e.printStackTrace();

return "";

}

}

/**

* 更新指定节点数据内容

*

* @param path 节点path

* @param data 数据内容

* @return

*/

public boolean writeData(String path, String data) {

try {

System.out.println("更新数据成功,path:" + path + ", stat: " +

this.zk.setData(path, data.getBytes(), -1));

} catch (KeeperException e) {

System.out.println("更新数据失败,发生KeeperException,path: " + path);

e.printStackTrace();

} catch (InterruptedException e) {

System.out.println("更新数据失败,发生 InterruptedException,path: " + path);

e.printStackTrace();

}

return false;

}

/**

* 删除指定节点

*

* @param path 节点path

*/

public void deleteNode(String path) {

try {

this.zk.delete(path, -1);

System.out.println("删除节点成功,path:" + path);

} catch (KeeperException e) {

System.out.println("删除节点失败,发生KeeperException,path: " + path);

e.printStackTrace();

} catch (InterruptedException e) {

System.out.println("删除节点失败,发生 InterruptedException,path: " + path);

e.printStackTrace();

}

}

public static void main(String[] args) {

TestJavaApi sample = new TestJavaApi();

sample.createConnection(CONNECTION_STRING, SESSION_TIMEOUT);

if (sample.createPath(ZK_PATH, "我是节点初始内容")) {

System.out.println();

System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");

sample.writeData(ZK_PATH, "更新后的数据");

System.out.println("数据内容: " + sample.readData(ZK_PATH) + "\n");

sample.deleteNode(ZK_PATH);

}

sample.releaseConnection();

}

/**

* 收到来自Server的Watcher通知后的处理。

*/

@Override

public void process(WatchedEvent event) {

System.out.println("收到事件通知:" + event.getState() + "\n");

if (Event.KeeperState.SyncConnected == event.getState()) {

connectedSemaphore.countDown();

}

}

}

1.1.1.2. Watch机制

package cn.enjoy.javaapi;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.Stat;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.atomic.AtomicInteger;

public class ZooKeeperWatcher implements Watcher  {

/** 定义原子变量 */

AtomicInteger seq = new AtomicInteger();

/** 定义session失效时间 */

private static final int SESSION_TIMEOUT = 10000;

/** zookeeper服务器地址 */

private static final String CONNECTION_ADDR = "192.168.30.10:2181";

/** zk父路径设置 */

private static final String PARENT_PATH = "/testWatch";

/** zk子路径设置 */

private static final String CHILDREN_PATH = "/testWatch/children";

/** 进入标识 */

private static final String LOG_PREFIX_OF_MAIN = "【Main】";

/** zk变量 */

private ZooKeeper zk = null;

/** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */

private CountDownLatch connectedSemaphore = new CountDownLatch(1);

/**

* 创建ZK连接

* @param connectAddr ZK服务器地址列表

* @param sessionTimeout Session超时时间

*/

public void createConnection(String connectAddr, int sessionTimeout) {

this.releaseConnection();

try {

zk = new ZooKeeper(connectAddr, sessionTimeout, this);

System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");

connectedSemaphore.await();

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 关闭ZK连接

*/

public void releaseConnection() {

if (this.zk != null) {

try {

this.zk.close();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

}

/**

* 创建节点

* @param path 节点路径

* @param data 数据内容

* @return

*/

public boolean createPath(String path, String data) {

try {

//设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)

this.zk.exists(path, true);

System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +

this.zk.create( /**路径*/

path,

/**数据*/

data.getBytes(),

/**所有可见*/

ZooDefs.Ids.OPEN_ACL_UNSAFE,

/**永久存储*/

CreateMode.PERSISTENT ) +

", content: " + data);

} catch (Exception e) {

e.printStackTrace();

return false;

}

return true;

}

/**

* 读取指定节点数据内容

* @param path 节点路径

* @return

*/

public String readData(String path, boolean needWatch) {

try {

return new String(this.zk.getData(path, needWatch, null));

} catch (Exception e) {

e.printStackTrace();

return "";

}

}

/**

* 更新指定节点数据内容

* @param path 节点路径

* @param data 数据内容

* @return

*/

public boolean writeData(String path, String data) {

try {

System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +

this.zk.setData(path, data.getBytes(), -1));

} catch (Exception e) {

e.printStackTrace();

}

return false;

}

/**

* 删除指定节点

*

* @param path

*            节点path

*/

public void deleteNode(String path) {

try {

this.zk.delete(path, -1);

System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 判断指定节点是否存在

* @param path 节点路径

*/

public Stat exists(String path, boolean needWatch) {

try {

return this.zk.exists(path, needWatch);

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

/**

* 获取子节点

* @param path 节点路径

*/

private List<String> getChildren(String path, boolean needWatch) {

try {

return this.zk.getChildren(path, needWatch);

} catch (Exception e) {

e.printStackTrace();

return null;

}

}

/**

* 删除所有节点

*/

public void deleteAllTestPath() {

if(this.exists(CHILDREN_PATH, false) != null){

this.deleteNode(CHILDREN_PATH);

}

if(this.exists(PARENT_PATH, false) != null){

this.deleteNode(PARENT_PATH);

}

}

/**

* 收到来自Server的Watcher通知后的处理。

*/

@Override

public void process(WatchedEvent event) {

System.out.println("进入 process 。。。。。event = " + event);

try {

Thread.sleep(200);

} catch (InterruptedException e) {

e.printStackTrace();

}

if (event == null) {

return;

}

// 连接状态

Watcher.Event.KeeperState keeperState = event.getState();

// 事件类型

Watcher.Event.EventType eventType = event.getType();

// 受影响的path

String path = event.getPath();

String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";

System.out.println(logPrefix + "收到Watcher通知");

System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());

System.out.println(logPrefix + "事件类型:\t" + eventType.toString());

if (Event.KeeperState.SyncConnected == keeperState) {

// 成功连接上ZK服务器

if (Event.EventType.None == eventType) {

System.out.println(logPrefix + "成功连接上ZK服务器");

connectedSemaphore.countDown();

}

//创建节点

else if (Event.EventType.NodeCreated == eventType) {

System.out.println(logPrefix + "节点创建");

try {

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

this.exists(path, true);

}

//更新节点

else if (Event.EventType.NodeDataChanged == eventType) {

System.out.println(logPrefix + "节点数据更新");

try {

Thread.sleep(100);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));

}

//更新子节点

else if (Event.EventType.NodeChildrenChanged == eventType) {

System.out.println(logPrefix + "子节点变更");

try {

Thread.sleep(3000);

} catch (InterruptedException e) {

e.printStackTrace();

}

System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));

}

//删除节点

else if (Event.EventType.NodeDeleted == eventType) {

System.out.println(logPrefix + "节点 " + path + " 被删除");

}

}

else if (Watcher.Event.KeeperState.Disconnected == keeperState) {

System.out.println(logPrefix + "与ZK服务器断开连接");

}

else if (Watcher.Event.KeeperState.AuthFailed == keeperState) {

System.out.println(logPrefix + "权限检查失败");

}

else if (Watcher.Event.KeeperState.Expired == keeperState) {

System.out.println(logPrefix + "会话失效");

}

System.out.println("--------------------------------------------");

}

public static void main(String[] args) throws Exception {

//建立watcher

ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();

//创建连接

zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);

//System.out.println(zkWatch.zk.toString());

Thread.sleep(1000);

// 清理节点

zkWatch.deleteAllTestPath();

if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "")) {

// 读取数据,在操作节点数据之前先调用zookeeper的getData()方法是为了可以watch到对节点的操作。watch是一次性的,

// 也就是说,如果第二次又重新调用了setData()方法,在此之前需要重新调用一次。

System.out.println("---------------------- read parent ----------------------------");

zkWatch.readData(PARENT_PATH, true);

// 更新数据

zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");

/** 读取子节点,设置对子节点变化的watch,如果不写该方法,则在创建子节点是只会输出NodeCreated,而不会输出NodeChildrenChanged,

也就是说创建子节点时没有watch。

如果是递归的创建子节点,如path="/p/c1/c2"的话,getChildren(PARENT_PATH, ture)只会在创建c1时watch,输出c1的NodeChildrenChanged,

而不会输出创建c2时的NodeChildrenChanged,如果watch到c2的NodeChildrenChanged,则需要再调用一次getChildren(String path, true)方法,

其中path="/p/c1"

*/

System.out.println("---------------------- read children path ----------------------------");

zkWatch.getChildren(PARENT_PATH, true);

Thread.sleep(1000);

// 创建子节点,同理如果想要watch到NodeChildrenChanged状态,需要调用getChildren(CHILDREN_PATH, true)

zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "");

Thread.sleep(1000);

zkWatch.readData(CHILDREN_PATH, true);

zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");

}

Thread.sleep(50000);

// 清理节点

zkWatch.deleteAllTestPath();

Thread.sleep(1000);

zkWatch.releaseConnection();

}

}

1.1.1.3. ZK认证机制

package cn.enjoy.javaapi;

import org.apache.zookeeper.Watcher;

import org.apache.zookeeper.*;

import org.apache.zookeeper.data.ACL;

import org.apache.zookeeper.data.Stat;

import java.util.ArrayList;

import java.util.List;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.atomic.AtomicInteger;

public class TestZookeeperAuth implements Watcher {

/** 连接地址 */

final static String CONNECT_ADDR = "192.168.30.10:2181";

/** 测试路径 */

final static String PATH = "/testAuth";

final static String PATH_DEL = "/testAuth/delNode";

/** 认证类型 */

final static String authentication_type = "digest";

/** 认证正确方法 */

final static String correctAuthentication = "123456";

/** 认证错误方法 */

final static String badAuthentication = "654321";

static ZooKeeper zk = null;

/** 计时器 */

AtomicInteger seq = new AtomicInteger();

/** 标识 */

private static final String LOG_PREFIX_OF_MAIN = "【Main】";

private CountDownLatch connectedSemaphore = new CountDownLatch(1);

@Override

public void process(WatchedEvent event) {

try {

Thread.sleep(200);

} catch (InterruptedException e) {

e.printStackTrace();

}

if (event==null) {

return;

}

// 连接状态

Event.KeeperState keeperState = event.getState();

// 事件类型

Event.EventType eventType = event.getType();

// 受影响的path

String path = event.getPath();

String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";

System.out.println(logPrefix + "收到Watcher通知");

System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());

System.out.println(logPrefix + "事件类型:\t" + eventType.toString());

if (Event.KeeperState.SyncConnected == keeperState) {

// 成功连接上ZK服务器

if (Event.EventType.None == eventType) {

System.out.println(logPrefix + "成功连接上ZK服务器");

connectedSemaphore.countDown();

}

} else if (Event.KeeperState.Disconnected == keeperState) {

System.out.println(logPrefix + "与ZK服务器断开连接");

} else if (Event.KeeperState.AuthFailed == keeperState) {

System.out.println(logPrefix + "权限检查失败");

} else if (Event.KeeperState.Expired == keeperState) {

System.out.println(logPrefix + "会话失效");

}

System.out.println("--------------------------------------------");

}

/**

* 创建ZK连接

*

* @param connectString

*            ZK服务器地址列表

* @param sessionTimeout

*            Session超时时间

*/

public void createConnection(String connectString, int sessionTimeout) {

this.releaseConnection();

try {

zk = new ZooKeeper(connectString, sessionTimeout, this);

//添加节点授权

zk.addAuthInfo(authentication_type,correctAuthentication.getBytes());

System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");

//倒数等待

connectedSemaphore.await();

} catch (Exception e) {

e.printStackTrace();

}

}

/**

* 关闭ZK连接

*/

public void releaseConnection() {

if (this.zk!=null) {

try {

this.zk.close();

} catch (InterruptedException e) {

}

}

}

/**

*

* @param args

* @throws Exception

*/

public static void main(String[] args) throws Exception {

TestZookeeperAuth testAuth = new TestZookeeperAuth();

testAuth.createConnection(CONNECT_ADDR,2000);

List<ACL> acls = new ArrayList<ACL>(1);

for (ACL ids_acl : ZooDefs.Ids.CREATOR_ALL_ACL) {

acls.add(ids_acl);

}

try {

zk.create(PATH, "init content".getBytes(), acls, CreateMode.PERSISTENT);

System.out.println("使用授权key:" + correctAuthentication + "创建节点:"+ PATH + ", 初始内容是: init content");

} catch (Exception e) {

e.printStackTrace();

}

try {

zk.create(PATH_DEL, "will be deleted! ".getBytes(), acls, CreateMode.PERSISTENT);

System.out.println("使用授权key:" + correctAuthentication + "创建节点:"+ PATH_DEL + ", 初始内容是: init content");

} catch (Exception e) {

e.printStackTrace();

}

// 获取数据

getDataByNoAuthentication();

getDataByBadAuthentication();

getDataByCorrectAuthentication();

// 更新数据

updateDataByNoAuthentication();

updateDataByBadAuthentication();

updateDataByCorrectAuthentication();

// 删除数据

deleteNodeByBadAuthentication();

deleteNodeByNoAuthentication();

deleteNodeByCorrectAuthentication();

//

Thread.sleep(1000);

deleteParent();

//释放连接

testAuth.releaseConnection();

}

/** 获取数据:采用错误的密码 */

static void getDataByBadAuthentication() {

String prefix = "[使用错误的授权信息]";

try {

ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);

//授权

badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());

Thread.sleep(2000);

System.out.println(prefix + "获取数据:" + PATH);

System.out.println(prefix + "成功获取数据:" + badzk.getData(PATH, false, null));

} catch (Exception e) {

System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());

}

}

/** 获取数据:不采用密码 */

static void getDataByNoAuthentication() {

String prefix = "[不使用任何授权信息]";

try {

System.out.println(prefix + "获取数据:" + PATH);

ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);

Thread.sleep(2000);

System.out.println(prefix + "成功获取数据:" + nozk.getData(PATH, false, null));

} catch (Exception e) {

System.err.println(prefix + "获取数据失败,原因:" + e.getMessage());

}

}

/** 采用正确的密码 */

static void getDataByCorrectAuthentication() {

String prefix = "[使用正确的授权信息]";

try {

System.out.println(prefix + "获取数据:" + PATH);

System.out.println(prefix + "成功获取数据:" + zk.getData(PATH, false, null));

} catch (Exception e) {

System.out.println(prefix + "获取数据失败,原因:" + e.getMessage());

}

}

/**

* 更新数据:不采用密码

*/

static void updateDataByNoAuthentication() {

String prefix = "[不使用任何授权信息]";

System.out.println(prefix + "更新数据: " + PATH);

try {

ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);

Thread.sleep(2000);

Stat stat = nozk.exists(PATH, false);

if (stat!=null) {

nozk.setData(PATH, prefix.getBytes(), -1);

System.out.println(prefix + "更新成功");

}

} catch (Exception e) {

System.err.println(prefix + "更新失败,原因是:" + e.getMessage());

}

}

/**

* 更新数据:采用错误的密码

*/

static void updateDataByBadAuthentication() {

String prefix = "[使用错误的授权信息]";

System.out.println(prefix + "更新数据:" + PATH);

try {

ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);

//授权

badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());

Thread.sleep(2000);

Stat stat = badzk.exists(PATH, false);

if (stat!=null) {

badzk.setData(PATH, prefix.getBytes(), -1);

System.out.println(prefix + "更新成功");

}

} catch (Exception e) {

System.err.println(prefix + "更新失败,原因是:" + e.getMessage());

}

}

/**

* 更新数据:采用正确的密码

*/

static void updateDataByCorrectAuthentication() {

String prefix = "[使用正确的授权信息]";

System.out.println(prefix + "更新数据:" + PATH);

try {

Stat stat = zk.exists(PATH, false);

if (stat!=null) {

zk.setData(PATH, prefix.getBytes(), -1);

System.out.println(prefix + "更新成功");

}

} catch (Exception e) {

System.err.println(prefix + "更新失败,原因是:" + e.getMessage());

}

}

/**

* 不使用密码 删除节点

*/

static void deleteNodeByNoAuthentication() throws Exception {

String prefix = "[不使用任何授权信息]";

try {

System.out.println(prefix + "删除节点:" + PATH_DEL);

ZooKeeper nozk = new ZooKeeper(CONNECT_ADDR, 2000, null);

Thread.sleep(2000);

Stat stat = nozk.exists(PATH_DEL, false);

if (stat!=null) {

nozk.delete(PATH_DEL,-1);

System.out.println(prefix + "删除成功");

}

} catch (Exception e) {

System.err.println(prefix + "删除失败,原因是:" + e.getMessage());

}

}

/**

* 采用错误的密码删除节点

*/

static void deleteNodeByBadAuthentication() throws Exception {

String prefix = "[使用错误的授权信息]";

try {

System.out.println(prefix + "删除节点:" + PATH_DEL);

ZooKeeper badzk = new ZooKeeper(CONNECT_ADDR, 2000, null);

//授权

badzk.addAuthInfo(authentication_type,badAuthentication.getBytes());

Thread.sleep(2000);

Stat stat = badzk.exists(PATH_DEL, false);

if (stat!=null) {

badzk.delete(PATH_DEL, -1);

System.out.println(prefix + "删除成功");

}

} catch (Exception e) {

System.err.println(prefix + "删除失败,原因是:" + e.getMessage());

}

}

/**

* 使用正确的密码删除节点

*/

static void deleteNodeByCorrectAuthentication() throws Exception {

String prefix = "[使用正确的授权信息]";

try {

System.out.println(prefix + "删除节点:" + PATH_DEL);

Stat stat = zk.exists(PATH_DEL, false);

if (stat!=null) {

zk.delete(PATH_DEL, -1);

System.out.println(prefix + "删除成功");

}

} catch (Exception e) {

System.out.println(prefix + "删除失败,原因是:" + e.getMessage());

}

}

/**

* 使用正确的密码删除节点

*/

static void deleteParent() throws Exception {

try {

Stat stat = zk.exists(PATH_DEL, false);

if (stat == null) {

zk.delete(PATH, -1);

}

} catch (Exception e) {

e.printStackTrace();

}

}

}

1.1.1. ZkClient

1.1.1.1. 基本操作

package cn.enjoy.zkclient;

import org.I0Itec.zkclient.ZkClient;

import org.I0Itec.zkclient.ZkConnection;

import java.util.List;

/**

* Created by VULCAN on 2018/11/7.

*/

public class ZkClientOperator {

/** zookeeper地址 */

static final String CONNECT_ADDR = "192.168.30.10:2181";

/** session超时时间 */

static final int SESSION_OUTTIME = 10000;//ms

public static void main(String[] args) throws Exception {

// ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

ZkClient zkc = new ZkClient(CONNECT_ADDR, SESSION_OUTTIME);

//1. create and delete方法

zkc.createEphemeral("/temp");

zkc.createPersistent("/super/c1", true);

Thread.sleep(10000);

zkc.delete("/temp");

zkc.deleteRecursive("/super");

//2. 设置path和data 并且读取子节点和每个节点的内容

zkc.createPersistent("/super", "1234");

zkc.createPersistent("/super/c1", "c1内容");

zkc.createPersistent("/super/c2", "c2内容");

List<String> list = zkc.getChildren("/super");

for(String p : list){

System.out.println(p);

String rp = "/super/" + p;

String data = zkc.readData(rp);

System.out.println("节点为:" + rp + ",内容为: " + data);

}

//3. 更新和判断节点是否存在

zkc.writeData("/super/c1", "新内容");

System.out.println(zkc.readData("/super/c1").toString());

System.out.println(zkc.exists("/super/c1"));

// 4.递归删除/super内容

zkc.deleteRecursive("/super");

}

}

1.1.1.2. 监听机制

package cn.enjoy.zkclient;

import org.I0Itec.zkclient.IZkChildListener;

import org.I0Itec.zkclient.IZkDataListener;

import org.I0Itec.zkclient.ZkClient;

import org.I0Itec.zkclient.ZkConnection;

import org.junit.Test;

import java.util.List;

public class TestZkClientWatcher {

/** zookeeper地址 */

static final String CONNECT_ADDR = "192.168.30.10:2181";

/** session超时时间 */

static final int SESSION_OUTTIME = 10000;//ms

@Test

/**

* subscribeChildChanges方法 订阅子节点变化

*/

public  void testZkClientWatcher1() throws Exception {

ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

//对父节点添加监听子节点变化。

zkc.subscribeChildChanges("/super", new IZkChildListener() {

@Override

public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {

System.out.println("parentPath: " + parentPath);

System.out.println("currentChilds: " + currentChilds);

}

});

Thread.sleep(3000);

zkc.createPersistent("/super");

Thread.sleep(1000);

zkc.createPersistent("/super" + "/" + "c1", "c1内容");

Thread.sleep(1000);

zkc.createPersistent("/super" + "/" + "c2", "c2内容");

Thread.sleep(1000);

zkc.delete("/super/c2");

Thread.sleep(1000);

zkc.deleteRecursive("/super");

Thread.sleep(Integer.MAX_VALUE);

}

@Test

/**

* subscribeDataChanges 订阅内容变化

*/

public void testZkClientWatcher2() throws Exception {

ZkClient zkc = new ZkClient(new ZkConnection(CONNECT_ADDR), SESSION_OUTTIME);

zkc.createPersistent("/super", "1234");

//对父节点添加监听子节点变化。

zkc.subscribeDataChanges("/super", new IZkDataListener() {

@Override

public void handleDataDeleted(String path) throws Exception {

System.out.println("删除的节点为:" + path);

}

@Override

public void handleDataChange(String path, Object data) throws Exception {

System.out.println("变更的节点为:" + path + ", 变更内容为:" + data);

}

});

Thread.sleep(3000);

zkc.writeData("/super", "456", -1);

Thread.sleep(1000);

zkc.delete("/super");

Thread.sleep(Integer.MAX_VALUE);

}

}

1.1.2. Curator

1.1.2.1.  基本操作

package cn.enjoy.curator;

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.api.BackgroundCallback;

import org.apache.curator.framework.api.CuratorEvent;

import org.apache.curator.framework.api.CuratorListener;

import org.apache.curator.framework.api.transaction.CuratorOp;

import org.apache.curator.framework.api.transaction.CuratorTransactionResult;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.data.Stat;

import org.junit.Before;

import org.junit.Test;

import java.util.List;

import static com.sun.xml.internal.ws.dump.LoggingDumpTube.Position.Before;

/**

*  测试Apache Curator框架的基本用法

*/

public class OperatorTest {

//ZooKeeper服务地址

private static final String SERVER = "192.168.30.10:2181";

//会话超时时间

private final int SESSION_TIMEOUT = 30000;

//连接超时时间

private final int CONNECTION_TIMEOUT = 5000;

//创建连接实例

private CuratorFramework client = null;

/**

* baseSleepTimeMs:初始的重试等待时间

* maxRetries:最多重试次数

*

*

* ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增

* RetryNTimes:重试N次

* RetryOneTime:重试一次

* RetryUntilElapsed:重试一定时间

*/

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

@org.junit.Before

public void init(){

//创建 CuratorFrameworkImpl实例

client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);

//启动

client.start();

}

/**

* 测试创建节点

* @throws Exception

*/

@Test

public void testCreate() throws Exception{

//创建永久节点

client.create().forPath("/curator","/curator data".getBytes());

//创建永久有序节点

client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath("/curator_sequential","/curator_sequential data".getBytes());

//创建临时节点

client.create().withMode(CreateMode.EPHEMERAL)

.forPath("/curator/ephemeral","/curator/ephemeral data".getBytes());

//创建临时有序节点

client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)

.forPath("/curator/ephemeral_path1","/curator/ephemeral_path1 data".getBytes());

}

/**

* 测试检查某个节点是否存在

* @throws Exception

*/

@Test

public void testCheck() throws Exception{

Stat stat1 = client.checkExists().forPath("/curator");

Stat stat2 = client.checkExists().forPath("/curator2");

System.out.println("‘/curator‘是否存在: " + (stat1 != null ? true : false));

System.out.println("‘/curator2‘是否存在: " + (stat2 != null ? true : false));

}

/**

* 测试异步设置节点数据

* @throws Exception

*/

@Test

public void testSetDataAsync() throws Exception{

//创建监听器

CuratorListener listener = new CuratorListener() {

@Override

public void eventReceived(CuratorFramework client, CuratorEvent event)

throws Exception {

System.out.println(event.getPath());

}

};

//添加监听器

client.getCuratorListenable().addListener(listener);

//异步设置某个节点数据

client.setData().inBackground().forPath("/curator","sync".getBytes());

//为了防止单元测试结束从而看不到异步执行结果,因此暂停10秒

Thread.sleep(10000);

}

/**

* 测试另一种异步执行获取通知的方式

* @throws Exception

*/

@Test

public void testSetDataAsyncWithCallback() throws Exception{

BackgroundCallback callback = new BackgroundCallback() {

@Override

public void processResult(CuratorFramework client, CuratorEvent event)

throws Exception {

System.out.println(event.getPath());

}

};

//异步设置某个节点数据

client.setData().inBackground(callback).forPath("/curator","/curator modified data with Callback".getBytes());

//为了防止单元测试结束从而看不到异步执行结果,因此暂停10秒

Thread.sleep(10000);

}

/**

* 测试删除节点

* @throws Exception

*/

@Test

public void testDelete() throws Exception{

//创建测试节点

client.create().orSetData().creatingParentsIfNeeded()

.forPath("/curator/del_key1","/curator/del_key1 data".getBytes());

client.create().orSetData().creatingParentsIfNeeded()

.forPath("/curator/del_key2","/curator/del_key2 data".getBytes());

client.create().forPath("/curator/del_key2/test_key","test_key data".getBytes());

//删除该节点

client.delete().forPath("/curator/del_key1");

//级联删除子节点

client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/curator/del_key2");

}

/*

* 测试事务管理:碰到异常,事务会回滚

* @throws Exception

*/

@Test

public void testTransaction() throws Exception{

//定义几个基本操作

CuratorOp createOp = client.transactionOp().create()

.forPath("/curator/one_path","some data".getBytes());

CuratorOp setDataOp = client.transactionOp().setData()

.forPath("/curator","other data".getBytes());

CuratorOp deleteOp = client.transactionOp().delete()

.forPath("/curator");

//事务执行结果

List<CuratorTransactionResult> results = client.transaction()

.forOperations(createOp,setDataOp,deleteOp);

//遍历输出结果

for(CuratorTransactionResult result : results){

System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());

}

}

}

1.1.2.2. 监听机制

package cn.enjoy.curator;

import org.apache.curator.RetryPolicy;

import org.apache.curator.framework.CuratorFramework;

import org.apache.curator.framework.CuratorFrameworkFactory;

import org.apache.curator.framework.api.CuratorEvent;

import org.apache.curator.framework.api.CuratorListener;

import org.apache.curator.framework.recipes.cache.*;

import org.apache.curator.retry.ExponentialBackoffRetry;

import org.apache.zookeeper.CreateMode;

import org.apache.zookeeper.WatchedEvent;

import org.apache.zookeeper.Watcher;

import org.junit.Test;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

public class EventTest {

//ZooKeeper服务地址

private static final String SERVER = "192.168.30.10:2181";

//会话超时时间

private final int SESSION_TIMEOUT = 30000;

//连接超时时间

private final int CONNECTION_TIMEOUT = 5000;

//创建连接实例

private CuratorFramework client = null;

/**

* baseSleepTimeMs:初始的重试等待时间

* maxRetries:最多重试次数

*

*

* ExponentialBackoffRetry:重试一定次数,每次重试时间依次递增

* RetryNTimes:重试N次

* RetryOneTime:重试一次

* RetryUntilElapsed:重试一定时间

*/

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);

@org.junit.Before

public void init(){

//创建 CuratorFrameworkImpl实例

client = CuratorFrameworkFactory.newClient(SERVER, SESSION_TIMEOUT, CONNECTION_TIMEOUT, retryPolicy);

//启动

client.start();

}

/**

*

* @描述:第一种监听器的添加方式: 对指定的节点进行添加操作

* 仅仅能监控指定的本节点的数据修改,删除 操作 并且只能监听一次 --->不好

*/

@Test

public  void TestListenterOne() throws Exception{

client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

// 注册观察者,当节点变动时触发

byte[] data = client.getData().usingWatcher(new Watcher() {

@Override

public void process(WatchedEvent event) {

System.out.println("获取 test 节点 监听器 : " + event);

}

}).forPath("/test");

client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

Thread.sleep(1000);

client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

Thread.sleep(1000);

System.out.println("节点数据: "+ new String(data));

Thread.sleep(10000);

}

/**

*

* @描述:第二种监听器的添加方式: Cache 的三种实现

*   Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。

*                  产生的事件会传递给注册的PathChildrenCacheListener。

*  Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。

*  Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

*/

//1.path Cache  连接  路径  是否获取数据

//能监听所有的字节点 且是无限监听的模式 但是 指定目录下节点的子节点不再监听

@Test

public void setListenterTwoOne() throws Exception{

ExecutorService pool = Executors.newCachedThreadPool();

PathChildrenCache childrenCache = new PathChildrenCache(client, "/test", true);

PathChildrenCacheListener childrenCacheListener = new PathChildrenCacheListener() {

@Override

public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {

System.out.println("开始进行事件分析:-----");

ChildData data = event.getData();

switch (event.getType()) {

case CHILD_ADDED:

System.out.println("CHILD_ADDED : "+ data.getPath() +"  数据:"+ data.getData());

break;

case CHILD_REMOVED:

System.out.println("CHILD_REMOVED : "+ data.getPath() +"  数据:"+ data.getData());

break;

case CHILD_UPDATED:

System.out.println("CHILD_UPDATED : "+ data.getPath() +"  数据:"+ data.getData());

break;

case INITIALIZED:

System.out.println("INITIALIZED : "+ data.getPath() +"  数据:"+ data.getData());

break;

default:

break;

}

}

};

childrenCache.getListenable().addListener(childrenCacheListener);

System.out.println("Register zk watcher successfully!");

childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);

//创建一个节点

client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());

Thread.sleep(1000);

client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","deer".getBytes());

Thread.sleep(1000);

client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node02","demo".getBytes());

Thread.sleep(1000);

client.delete().forPath("/test/node02");

Thread.sleep(10000);

}

//2.Node Cache  监控本节点的变化情况   连接 目录 是否压缩

//监听本节点的变化  节点可以进行修改操作  删除节点后会再次创建(空节点)

@Test

public void setListenterTwoTwo() throws Exception{

ExecutorService pool = Executors.newCachedThreadPool();

//设置节点的cache

final NodeCache nodeCache = new NodeCache(client, "/test", false);

nodeCache.getListenable().addListener(new NodeCacheListener() {

@Override

public void nodeChanged() throws Exception {

System.out.println("the test node is change and result is :");

System.out.println("path : "+nodeCache.getCurrentData().getPath());

System.out.println("data : "+new String(nodeCache.getCurrentData().getData()));

System.out.println("stat : "+nodeCache.getCurrentData().getStat());

}

});

nodeCache.start();

client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

Thread.sleep(1000);

client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","enjoy".getBytes());

Thread.sleep(10000);

}

//3.Tree Cache

// 监控 指定节点和节点下的所有的节点的变化--无限监听  可以进行本节点的删除(不在创建)

@Test

public void TestListenterTwoThree() throws Exception{

ExecutorService pool = Executors.newCachedThreadPool();

//设置节点的cache

TreeCache treeCache = new TreeCache(client, "/test");

//设置监听器和处理过程

treeCache.getListenable().addListener(new TreeCacheListener() {

@Override

public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {

ChildData data = event.getData();

if(data !=null){

switch (event.getType()) {

case NODE_ADDED:

System.out.println("NODE_ADDED : "+ data.getPath() +"  数据:"+ new String(data.getData()));

break;

case NODE_REMOVED:

System.out.println("NODE_REMOVED : "+ data.getPath() +"  数据:"+ new String(data.getData()));

break;

case NODE_UPDATED:

System.out.println("NODE_UPDATED : "+ data.getPath() +"  数据:"+ new String(data.getData()));

break;

default:

break;

}

}else{

System.out.println( "data is null : "+ event.getType());

}

}

});

//开始监听

treeCache.start();

//创建一个节点

client.create().orSetData().withMode(CreateMode.PERSISTENT).forPath("/test","test".getBytes());

Thread.sleep(1000);

client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","enjoy".getBytes());

Thread.sleep(1000);

client.create().orSetData().withMode(CreateMode.EPHEMERAL).forPath("/test/node01","deer".getBytes());

Thread.sleep(1000);

client.create().orSetData().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath("/test/node02/node02_2","deer".getBytes());

Thread.sleep(10000);

}

}

原文地址:https://www.cnblogs.com/Soy-technology/p/11391701.html

时间: 2024-08-08 09:25:46

Zookeeper原生客户端的相关文章

Zookeeper系列三:Zookeeper客户端的使用(Zookeeper原生API如何进行调用、ZKClient、Curator)

一.Zookeeper原生API如何进行调用 准备工作: 首先在新建一个maven项目ZK-Demo,然后在pom.xml里面引入zk的依赖 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> </dependency> 1. 连接zk并

(原) 2.1 Zookeeper原生API使用

本文为原创文章,未经允许不得转载 Zookeeper原生API使用 1.jar包引入,演示版本为3.4.6,非maven项目,可以下载jar包导入到项目中 <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> 2.

Zookeeper开源客户端框架Curator简介

Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手. 看完官方的文档之后, 发现Curator主要解决了三类

Zookeeper【客户端启动】

今天我们来说说 Zookeeper 客户端启动,整个文章分三个部分:第一部分是 Zookeeper 原生 API 客户端,第二部分是开源客户端 ZkClient,第三部分是开源客户端 Curator. [Zookeeper API 客户端]

Zookeeper开源客户端Curator之基本功能讲解

简介 Curator是Netflix公司开源的一套Zookeeper客户端框架.了解过Zookeeper原生API都会清楚其复杂度.Curator帮助我们在其基础上进行封装.实现一些开发细节,包括接连重连.反复注册Watcher和NodeExistsException等.目前已经作为Apache的顶级项目出现,是最流行的Zookeeper客户端之一.从编码风格上来讲,它提供了基于Fluent的编程风格支持. 除此之外,Curator还提供了Zookeeper的各种应用场景:Recipe.共享锁服

.NET Core)的ZooKeeper异步客户端

支持断线重连.永久watcher.递归操作并且能跨平台(.NET Core)的ZooKeeper异步客户端 阅读目录 什么是ZooKeeper? 项目介绍 提供的功能 使用说明 FAQ 在公司内部的微服务架构中有使用到了"ZooKeeper",虽然官方有提供了.NET的SDK,但易用性非常的差,且搜遍github.nuget,没有发现一个可以跨平台且易用的组件,所以我又"美化"了一个轮子. 回到目录 什么是ZooKeeper? ZooKeeper是一个分布式的,开放

为什么dubbo使用ZkClient作为zookeeper的客户端

本文内容并非原创,使用资料均来自互联网. dubbo使用了zkClient而不是使用zookeeper本身的客户端与zookeeper进行交互,为什么呢? 先看看zookeeper本身自带的客户端的问题. 1)ZooKeeper的Watcher是一次性的,用过了需要再注册: 2) session的超时后没有自动重连,生产环境中如果网络出现不稳定情况,那么这种情况出现的更加明显:3) 没有领导选举机制,集群情况下可能需要实现stand by,一个服务挂了,另一个需要接替的效果:4) 客户端只提供了

zookeeper开源客户端curator

zookeeper的原生api相对来说比较繁琐,比如:对节点添加监听事件,当监听触发后,我们需要再次手动添加监听,否则监听只生效一次:再比如,断线重连也需要我们手动代码来判断处理等等.对于curator的介绍,从网上百度了一段:Curator是Netflix开源的一套zookeeper客户端框架,用它来操作zookeeper更加方便,按Curator官方所比喻的,guava to JAVA,curator to zookeeper,Curator采用了fluent风格的代码,非常简洁. ----

八:Zookeeper开源客户端Curator的api测试

curator是Netflix公司开源的一套ZooKeeper客户端,Curator解决了很多ZooKeeper客户端非常底层的细节开发工作.包括连接重连,反复注册Watcher等.实现了Fluent风格的API接口,目前已经为Apache的顶级项目,是全世界使用最广泛的ZooKeeper客户端之一 第一:maven依赖 1 <dependency> 2 <groupId>org.apache.curator</groupId> 3 <artifactId>