之前只理解zk可以做命名,配置服务,现在学习下他怎么用作构建master-slave模式的分布式系统。
为什么叫Zoo?“因为要协调的分布式系统是一个动物园”。
ZooKeeper是一个中性化的Service,用于管理配置信息、命名、提供分布式同步,还能组合Service。所有这些种类的Service都会在分布式应用程序中使用到。每次编写这些Service都会涉及大量的修bug和竞争情况。正因为这种编写这些Service有一定难度,所以通常都会忽视它们,这就使得在应用程序有变化时变得难以管理应用程序。即使处理得当,实现这些服务的不同方法也会使得部署应用程序变得难以管理。
下边代码是参考文献的java版本,通过service来协调各个独立的PHP脚本,并让它们同意某个成为Leader(所以称作Leader选举)。当Leader退出(或崩溃)时,worker可检测到并再选出新的leader。通过这种方式即可理解一般的master-slave结构分布式系统是如何实现如何调度的,zk是个好东西。
首先需要了解创建节点的模式:
PERSISTENT:持久化目录节点,这个目录节点存储的数据不会丢失;
PERSISTENT_SEQUENTIAL:顺序自动编号的目录节点,这种目 录节点会根据当前已近存在的节点数自动加 1,然后返回给客户端已经成功创建的目录节点名;
EPHEMERAL:临时目录节点,一旦创建这个节点的客户端与服务器端口也就是 session 超时,这种节点会被自动删除;
EPHEMERAL_SEQUENTIAL:临时自动编号节点。
程序逻辑:
1)首先创建根节点/cluster,并创建自身子节点,以 /cluster/w- 为前缀,使用临时自动编号节点模式创建节点
2)获取/cluster的所有子节点并排序,当发现自身是第一个节点时,则自我选举为leader,否则认定为follower
3)注册监听事件,当/cluster里前一个节点有变动时,回到2)
这样,便实现了自动选举,当有节点在timeout时段后不可用时,自动产生新的leader,也可根据当前节点数进行预警。
package zookeeper; import org.apache.zookeeper.*; import java.io.IOException; import java.util.Collections; import java.util.List; /** * Created with IntelliJ IDEA. * * @author guanpu * Date: 14-10-22 * Time: 下午5:11 * To change this template use File | Settings | File Templates. */ public class Worker extends ZooKeeper implements Runnable, Watcher { public static final String NODE_NAME = "/cluster"; public String znode; private boolean leader; public Worker(String connectString, int sessionTimeout, Watcher watcher) throws IOException { super(connectString, sessionTimeout, watcher); } public boolean register() throws InterruptedException, KeeperException { if (this.exists(NODE_NAME, null) == null) { this.create(NODE_NAME, "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } znode = this.create(NODE_NAME + "/w-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); znode = znode.replace(NODE_NAME + "/", ""); String node = watchPrevious(); if (node.equals(znode)) { System.out.println("nobody here ,i am leader"); leader = true; } else { System.out.println("i am watching"); } return true; } private String watchPrevious() throws InterruptedException, KeeperException { List<String> works = this.getChildren(NODE_NAME, this); Collections.sort(works); System.out.println(works); int i = 0; for (String work : works) { if (znode.equals(work)) { if (i > 0) { //this.getData(NODE_NAME + "/" + works.get(i - 1), this, null); return works.get(i - 1); } return works.get(0); } } return ""; } @Override public void run() { try { this.register(); } catch (InterruptedException e) { } catch (KeeperException e) { } while (true) { try { if (leader) { System.out.println("leading"); } else { System.out.println("following"); } Thread.sleep(1000); } catch (InterruptedException e) { } } } public static void main(String[] args) { try { String hostPort = "10.16.73.22,10.16.73.12,10.16.73.13"; new Thread(new Worker(hostPort, 3000, null)).start(); } catch (IOException e) { } } @Override public void process(WatchedEvent event) { String t = String.format("hello event! type=%s, stat=%s, path=%s", event.getType(), event.getState(), event.getPath()); System.out.println(t); System.out.println("hello ,my cluster id is :"+znode); String node = ""; try { node = this.watchPrevious(); } catch (InterruptedException e) { } catch (KeeperException e) { } if (node.equals(znode)) { System.out.println("process: nobody here ,i am leader"); leader = true; } else { System.out.println("process: i am watching"); } } }
启动至少三个终端,模拟Leader崩溃的情形。使用Ctrl+c或其他方法退出第一个脚本。刚开始不会有任何变化,worker可以继续工作。后来,ZooKeeper会发现超时,并选举出新的leader。
php移植到java有两个问题,第一个是watcher注册,第一次父类初始化未完成时不能调用自身作为watcher,会报一次watcher调用空指针。
第二个问题:
this.getData(NODE_NAME + "/" + works.get(i - 1), this, null);
这个不生效,看方法注释是当改动和移除节点时,触发watcher的process,但实验中并未触发,在java里系统的自动删除并不归类在这两个操作之内?
php版本的是正常的,作为遗留问题。为了程序正常运行,更改为 List<String> works = this.getChildren(NODE_NAME, this); 当子节点有变动时执行process方法。
参考文献:
http://anykoro.sinaapp.com/2013/04/05/使用apache-zookeeper分布式部署php应用程序/