使用ZooKeeper的Java API来实现简单的使用zookeeper,并完成一个简单的经典小需求。
示例:假设有一组服务器用于为客户端提供某种服务。我们希望每个客户端都能找到其中一台服务器,这样它们就可以使用这项服务。
有一个挑战是如何维护这组服务器的列表。这组服务器的成员列表显然不能存储在网络中的单个节点上,否则该节点的故障将意味着整个系统的故障,我们希望这个成员列表是高可用的。
我们还希望当其中一台服务器出现故障时,能够从这组服务器成员列表中删除该节点,避免提供不可用的服务。
相应的实现代码对象以下几个类:
ConnectionWatcher.java 维护客户端和ZooKeeper服务之间的连接;
CreateGroup.java 创建指定的服务组在ZooKeeper上;
DeleteGroup.java 删除指定的服务组在ZooKeeper上;
JoinGroup.java 添加成员在指定的服务组;
ListGroup.java 列出指定服务组下的所有成员;
源代码如下:
public class ConnectionWatcher implements Watcher{ private static final int SESSION_TIMEOUT = 5000; protected ZooKeeper zk; private CountDownLatch connectedSignal = new CountDownLatch(1); public void connect(String hosts) throws Exception{ //创建zookeeper实例的时候会启动一个线程连接到zookeeper服务。 zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); connectedSignal.await(); } //当客户端已经与zookeeper建立连接后,Watcher的process方法会被调用。 public void process(WatchedEvent event) { if(event.getState() == KeeperState.SyncConnected){ connectedSignal.countDown(); } } public void close() throws Exception{ zk.close(); } }
public class CreateGroup extends ConnectionWatcher{ public void create(String groupName) throws Exception{ String path = "/"+groupName; //创建一个完全开发的ACL(访问控制列表)、持久的znode String createdPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); System.out.println("Created "+createdPath); } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub CreateGroup createGroup = new CreateGroup(); createGroup.connect("192.168.44.231"); createGroup.create("myzoo"); createGroup.close(); } }
public class DeleteGroup extends ConnectionWatcher{ public void delete(String groupName) throws Exception{ String path = "/" + groupName; try { //ZooKeeper不支持递归删除操作,因此在删除父节点之前必须先删除子节点。 List<String> children = zk.getChildren(path, false); for(String child : children){ zk.delete(path+"/"+child, -1); } zk.delete(path, -1); System.out.printf("Delete Group %s\n", path); } catch (KeeperException.NoNodeException e) { System.out.printf("Group %s does not exist\n", groupName); System.exit(1); } } public static void main(String[] args) throws Exception { DeleteGroup deleteGroup = new DeleteGroup(); deleteGroup.connect("192.168.44.231"); deleteGroup.delete("myzoo"); deleteGroup.close(); } }
public class JoinGroup extends ConnectionWatcher{ public void join(String groupName, String memberName) throws Exception{ String path = "/" + groupName + "/" + memberName; String createdPath = zk.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Created "+ createdPath); } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub JoinGroup joinGroup = new JoinGroup(); joinGroup.connect("192.168.44.231"); joinGroup.join("myzoo", "myservice2"); //stay alive until process is killed or thread is interrupted Thread.sleep(Long.MAX_VALUE); } }
public class ListGroup extends ConnectionWatcher{ public void list(String groupName) throws Exception{ String path = "/" + groupName; try { List<String> children = zk.getChildren(path, false); if(children.isEmpty()){ System.out.printf("No members in group %s\n", groupName); System.exit(1); } for(String child : children){ System.out.println(child); } } catch (KeeperException.NoNodeException e) { System.out.printf("Group %s does not exist\n", groupName); System.exit(1); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { // TODO Auto-generated method stub ListGroup listGroup = new ListGroup(); listGroup.connect("192.168.44.231"); listGroup.list("myzoo"); listGroup.close(); } }
时间: 2024-09-29 03:29:53