模拟leader选举:
1、zookeeper服务器上有一个/leader节点
2、在/leader节点下创建短暂顺序节点/leader/lock-xxxxxxx
3、获取/leader的所有子节点并注册监听
4、拿自己的顺序号跟其他子节点的顺序号比较,如果自己的是最小的则获得leader
5、监听到/leader子节点发生变化则执行步骤3、 4尝试获取leader
Client .java
package leader; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.Watcher.Event.EventType; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; /** * 模拟leader选举 * * 1、zookeeper服务器上有一个/leader节点 * 2、在/leader节点下创建短暂顺序节点/leader/lock-xxxxxxx * 3、获取/leader的所有子节点并注册监听 * 4、拿自己的顺序号跟其他子节点的顺序号比较,如果自己的是最小的则获得leader * 5、监听到/leader子节点发生变化则执行步骤3、 4尝试获取leader * */ public class Client { public static final String HOSTS = "hadoop1:2181"; public static final String LEADER_PATH = "/leader"; private String path = null; public void run() throws Exception { ZooKeeper zk = ZKUtils.open(HOSTS); path = zk.create(LEADER_PATH + "/lock-", null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); getLeader(zk); TimeUnit.DAYS.sleep(1); } private void getLeader(final ZooKeeper zk) throws KeeperException, InterruptedException { //获取/leader下的所有子节点 //注册/leader子节点观察 List<String> children = zk.getChildren(LEADER_PATH, new Watcher() { @Override public void process(WatchedEvent event) { //如果/leader子节点发生变化,则再进行一次getLeader if(event.getType().equals(EventType.NodeChildrenChanged)) { try { getLeader(zk); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } } }); //拿自己的顺序号跟/leader所有子节点的顺序号比较,如果是最小的则拿到leader long seq = getSeq(path); boolean isMin = true; for (String child : children) { long childSeq = getSeq(child); if(childSeq < seq) { isMin = false; break; } } if(isMin) { System.out.printf("我拿到锁了, path: %s, thread: %s", path, Thread.currentThread().getName() ); } } public long getSeq(String path) { return Long.parseLong(path.split("-")[1]); } }
Test .java
package leader; public class Test { //多次执行这个类,模拟多个客户端竞选leader public static void main(String[] args) throws Exception { new Client().run(); } }
其实在上面的实现中存在几个问题:
1、羊群效应
在上面的实现中,监听的是/leader的子节点变化,每当一个子节点添加或删除的时候都会通知所有客户端(客户端监听了子节点变化),客户端开始竞争leader,如果客户端成百上千个,这样形成了瞬间峰值流量,对zookeeper服务器造成压力。
而其实,并不是所有的客户端都需要对某个子节点变化进行处理,服务器只需要通知被删除节点的下一个节点即可,而添加新节点不需要通知客户端。
2、重试问题
客户端在创建暂时顺序节点时不能处理因连接丢失导致的失败,因为客户端没法知道create操作是成功还是失败,create操作不是幂等操作(多次create会创建多个节点),不能进行重试。
解决方法是给将要创建的节点一个标识符,以表明是我这个客户端创建的,通常使用sessionid来标识,最终创建的节点名称形如lock-<sessionid>-<sequenceNumber>,重试时,查询/leader是否存在包含<sessionid>的子节点,没有再创建。
由此可见,创建高效可靠的分布式锁是多么的困难,zookeeper的recipes目录下有个WriteLock锁实现,在生产环境下也可使用。
参考资料:《Hadoop权威指南(第二版)》
时间: 2024-08-05 11:15:00