一端不停的更新配置,另一端监听这个配置的变化。
需要注意的是:监听端不一定读取到所有的变化。在zk服务器发送通知到客户端,客户端读取数据注册监听之间可能发生了多次数据变化,这些数据变化是得不到通知的。但可以保证的是每次通知得到的数据都是比之前的数据要新的。
ZKUtils.java
package config; import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class ZKUtils { /** * 构建zookeeper客户端对象 * @param hosts * @return * @throws Exception */ public static ZooKeeper open(String hosts) throws Exception { final CountDownLatch singal = new CountDownLatch(1); ZooKeeper zk = new ZooKeeper(hosts, 2000, new Watcher() { @Override public void process(WatchedEvent event) { singal.countDown(); } }); singal.await(); return zk; } }
ConfigUpdater.java
package config; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; /** * 不停的更新/test节点上的数据, 模拟配置更新 * */ public class ConfigUpdater { public static final String HOSTS = "hadoop1:2181"; public static final String PATH = "/test"; public static void main(String[] args) throws Exception { ZooKeeper zk = ZKUtils.open(HOSTS); while(true) { String data = UUID.randomUUID().toString(); Stat stat = zk.exists(PATH, false); if(stat == null) { zk.create(PATH, data.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } else { zk.setData(PATH, data.getBytes("UTF-8"), -1); } TimeUnit.SECONDS.sleep(5); } } }
ConfigUpdateWatcher .java
package config; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooKeeper; /** * 注册监听/test节点上的数据变化 * */ public class ConfigUpdateWatcher implements Watcher { private ZooKeeper zk = null; public ConfigUpdateWatcher() { try { zk = ZKUtils.open(ConfigUpdater.HOSTS); } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { System.out.println(event); if(event.getType().equals(EventType.NodeDataChanged)) { try { //读取事件后, 再次注册数据监听事件 byte[] data = zk.getData(ConfigUpdater.PATH, this, null); System.out.printf("接收到了事件%s, 新的数据是:%s", EventType.NodeDataChanged, new String(data, "UTF-8")); System.out.println(); } catch (Exception e) { e.printStackTrace(); } } } private void run() { try { //注册数据变化监听 zk.getData(ConfigUpdater.PATH, this, null); } catch (Exception e) { e.printStackTrace(); } try { TimeUnit.SECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { new ConfigUpdateWatcher().run(); } }
完善
在操作zookeeper的时候,如果是幂等操作(多次操作不影响结果),在失败时可以多次重试以增加可靠性,比如ConfigUpdater的写操作可以进行多次重试,修改后的代码如下:
package config; import java.util.UUID; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.data.Stat; /** * 不停的更新/test节点上的数据, 模拟配置更新 * */ public class ConfigUpdater { public static final String HOSTS = "hadoop1:2181"; public static final String PATH = "/test"; public static final int RETRIES = 3; //重试次数 public static final int RETRY_PERIOD = 500; //重试间隔 public static void main(String[] args) throws Exception { ZooKeeper zk = ZKUtils.open(HOSTS); //不停的模拟数据更新操作 while(true) { String data = UUID.randomUUID().toString(); /* * 多次重试,增加可靠性 */ int retied = 0; while(true) { try { Stat stat = zk.exists(PATH, false); if(stat == null) { zk.create(PATH, data.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); } else { zk.setData(PATH, data.getBytes("UTF-8"), -1); } //执行成功则跳出循环,否则继续(重试) break; } catch (KeeperException e) { retied++; //如果会话过期了则重新创建一个ZooKeeper客户端对象 if(e.code().equals(KeeperException.Code.SESSIONEXPIRED)) { zk = ZKUtils.open(HOSTS); } else { //其他KeeperException Code的处理 //KeeperException.Code.CONNECTIONLOSS异常可以不用处理:ZooKeeper客户端对象会自动进行重新连接 } //可以重试3次,每次间隔500毫秒 if(retied == RETRIES) { throw e; } else { TimeUnit.MICROSECONDS.sleep(RETRY_PERIOD); } } } //模拟其他操作占用的时间 TimeUnit.SECONDS.sleep(5); } } }
时间: 2024-10-21 08:04:09