package com.billstudy.zookeeper; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.Stat; /** * 注册服务,并且自动监听可用服务列表 * @author Bill * @since V1.0 2015年6月24日 - 上午10:03:24 */ public class AppServer { private ZooKeeper zk = null; // 树前缀 private static final String zkParentPrefix = "/appserver"; private static final String zkChildPrefix = "/app"; // 维护可用列表 private final ArrayList<String> availableServerList = new ArrayList<String>(); public void connectZk(String address){ try { zk = new ZooKeeper("hadoop-server05:2181,hadoop-server06:2181,hadoop-server07:2181", 5000, new Watcher() { @Override public void process(WatchedEvent event) { System.out.println(event.toString()); if (event.getType() == EventType.NodeChildrenChanged && event.getPath().startsWith(zkParentPrefix) ) { flushServerList(); } } }); // 如果根节点没有,则先创建 if (zk.exists(zkParentPrefix, true) == null) { zk.create(zkParentPrefix, "AppServer root dir ".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("znode :" + zkParentPrefix + "is not exists , create successful !"); } // 根据当前address创建临时连续子节点,这样多个不同的app child节点不会重复。 zk会自己维护序列 String childPath = zk.create(zkParentPrefix + zkChildPrefix, address.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("create " + childPath + " successful, address is :" + address); flushServerList(); } catch (Exception e) { e.printStackTrace(); } } /** * 收到子节点更新后,刷新当前可用服务列表 * @author Bill * @since V1.0 2015年6月24日 - 上午10:08:19 */ protected void flushServerList() { availableServerList.clear(); try { List<String> children = zk.getChildren(zkParentPrefix, true); for (String child : children) { byte[] data = zk.getData(zkParentPrefix + "/" + child, true,new Stat()); availableServerList.add(new String(data,"UTF-8")); } System.out.println("current available server list:" + availableServerList); } catch (Exception e) { e.printStackTrace(); } } /** * 此处可以用来处理业务逻辑,目前让主线程挂起 * @author Bill * @since V1.0 2015年6月24日 - 上午10:24:00 */ public void handle(){ try { // System.out.println("handle ..."); TimeUnit.HOURS.sleep(Long.MAX_VALUE); } catch (InterruptedException e) { e.printStackTrace(); } } public static void main(String[] args) { if (args.length != 1) { System.err.println("The program first argument must be address !"); System.exit(1); } AppServer appServer = new AppServer(); appServer.connectZk(args[0]); appServer.handle(); } }
时间: 2024-10-28 16:20:55