zk抢主

package com.autonavi.tinfo.t1.traffic.pub.openlr.util;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.zkclient.IZkChildListener;
import com.github.zkclient.IZkStateListener;
import com.github.zkclient.ZkClient;

public class StatusMonitor {

private Lock lock = new ReentrantLock();// 锁对象
    private boolean usingHA = true;
    private volatile boolean isLeader = false;

private static final Logger logger = LoggerFactory.getLogger(StatusMonitor.class);
    // 超时时间
    private int zkSessionTimeOut=5000;
    private int zkConnectionTimeOut=5000;
    //private int SESSION_TIMEOUT = 5000;
    //private int CONNECTION_TIMEOUT = 5000;

// zookeeper server列表
    private String zkServerList = "10.17.132.71:2181";

private String zkServerDir = "fast-update";
    private String subNode = "openlr";

// 当前client创建的子节点
    private String curPath;
    private ZkClient zkClient;
    ScheduledExecutorService intervalMonitorExecutor = Executors.newSingleThreadScheduledExecutor();

/**
     * 连接zookeeper
     */
    public void init() {
        logger.info("StatusMonitor.init zkSessionTimeOut:{},zkConnectionTimeOut:{}",zkSessionTimeOut,zkConnectionTimeOut);
        try {
            connect();
        } catch (Exception e) {
            this.isLeader = false;
            logger.error(e.getMessage(), e);
            try {
                connect();
            } catch (Exception e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
                logger.error("error occurs during sync data from zk");
                System.exit(0);
            } finally {
                ;
            }
        }

intervalMonitorExecutor.scheduleAtFixedRate(new Runnable() {

@Override
            public void run() {
                lock.lock();
                try {
                    if (zkClient == null) {
                        isLeader = false;
                        return;
                    }
                    if (zkClient != null && zkClient.getZooKeeper() == null) {
                        isLeader = false;
                        return;
                    }
                    if (zkClient != null && (!zkClient.getZooKeeper().getState().isAlive()
                            || !zkClient.getZooKeeper().getState().isConnected())) {
                        isLeader = false;
                        return;
                    }
                } finally {
                    lock.unlock();
                }
            }
        }, 0, 2, TimeUnit.SECONDS);

}

public void connect() throws Exception {
        if (!usingHA) {
            return;
        }

if (this.zkClient != null) {
            this.zkClient.close();
        }
        this.zkClient = new ZkClient(zkServerList, zkSessionTimeOut, zkConnectionTimeOut);

if (!zkClient.exists("/" + zkServerDir)) {
            zkClient.createPersistent("/" + zkServerDir, null);
        }
        if (curPath == null) {
            curPath = zkClient.createEphemeralSequential("/" + zkServerDir + "/" + subNode, "monitor".getBytes());
        }

try {
            startWatchingTopicStatus();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            logger.error(e.getMessage(), e);
            logger.error("error occurs during sync data from zk");
            System.exit(0);
        }
        Thread.sleep(2000);// */
        handleMonitorNodeChange();
    }

public void startWatchingTopicStatus() {
        ZkTopicStatusListener topicEventListener = new ZkTopicStatusListener();
        ZkConnectedStatusListener connectedStatusListener = new ZkConnectedStatusListener();
        try {
            zkClient.subscribeChildChanges("/" + zkServerDir, topicEventListener);
            zkClient.subscribeStateChanges(connectedStatusListener);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            startWatchingTopicStatus();
        }

}

public void stop() {
        if (zkClient == null) {
            logger.warn("shutdown topic event watcher");
            return;
        }
        // stopWatchingTopicEvents();
        zkClient.close();
        zkClient = null;
    }

private void beLeader() {
        logger.info("this node gains lock {} and becomes leader ", curPath);
        System.out.println("this node gains lock " + curPath + " and becomes leader");
        this.isLeader = true;
    }

public void setUsingHA(boolean isUsingHA) {
        this.usingHA = isUsingHA;
    }

public void setZkServerDir(String zkServerDir) {
        this.zkServerDir = zkServerDir;
    }

public boolean isUsingHA() {
        return usingHA;
    }

public boolean isLeader() {
        return isLeader;
    }

public void setZkServerList(String zkServerList) {
        this.zkServerList = zkServerList;
    }

/*public int getSESSION_TIMEOUT() {
        return SESSION_TIMEOUT;
    }

public void setSESSION_TIMEOUT(int sESSION_TIMEOUT) {
        SESSION_TIMEOUT = sESSION_TIMEOUT;
    }

public int getCONNECTION_TIMEOUT() {
        return CONNECTION_TIMEOUT;
    }

public void setCONNECTION_TIMEOUT(int cONNECTION_TIMEOUT) {
        CONNECTION_TIMEOUT = cONNECTION_TIMEOUT;
    }*/

public int getZkSessionTimeOut() {
        return zkSessionTimeOut;
    }

public void setZkSessionTimeOut(int zkSessionTimeOut) {
        this.zkSessionTimeOut = zkSessionTimeOut;
    }

public int getZkConnectionTimeOut() {
        return zkConnectionTimeOut;
    }

public void setZkConnectionTimeOut(int zkConnectionTimeOut) {
        this.zkConnectionTimeOut = zkConnectionTimeOut;
    }

public void handleMonitorNodeChange() throws Exception {
        this.lock.lock();
        try {
            if (zkClient == null)
                return;
            if (!zkClient.exists("/" + zkServerDir)) {
                zkClient.createPersistent("/" + zkServerDir, null);
            }

// 确认curPath是否真的是列表中的最小节点
            List<String> childs = zkClient.getChildren("/" + zkServerDir);
            if (childs == null || childs.size() == 0) {
                // 创建子节点
                curPath = zkClient.createEphemeralSequential("/" + zkServerDir + "/" + subNode, "monitor".getBytes());
                childs = zkClient.getChildren("/" + zkServerDir);
            }
            Collections.sort(childs);

String thisNode = curPath.substring(("/" + zkServerDir + "/").length());
            int index = childs.indexOf(thisNode);
            if (index < 0) {
                curPath = zkClient.createEphemeralSequential("/" + zkServerDir + "/" + subNode, "monitor".getBytes());
                childs = zkClient.getChildren("/" + zkServerDir);
                Collections.sort(childs);
                thisNode = curPath.substring(("/" + zkServerDir + "/").length());
                index = childs.indexOf(thisNode);
            }

if (index == 0) {
                // 确实是最小节点
                beLeader();
            } else {
                this.isLeader = false;
            }
        } finally {
            this.lock.unlock();
        }
    }

class ZkTopicStatusListener implements IZkChildListener {

@Override
        public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
            handleMonitorNodeChange();
        }
    }

class ZkConnectedStatusListener implements IZkStateListener {

@Override
        public void handleStateChanged(KeeperState state) throws Exception {
            // TODO Auto-generated method stub
            if (state.equals(state.SyncConnected) || state.equals(state.ConnectedReadOnly)) {
                System.out.println("zookeeper start to be connected");
                handleMonitorNodeChange();
            }
        }

@Override
        public void handleNewSession() throws Exception {
            // TODO Auto-generated method stub
        }

}

public static void main(String[] args) throws Exception {
        StatusMonitor statusMonitor = new StatusMonitor();
        statusMonitor.setZkServerList("10.61.97.23:2181");
        statusMonitor.setUsingHA(true);

statusMonitor.init();

Thread.sleep(100000000);
    }

}

时间: 2024-12-28 02:08:41

zk抢主的相关文章

Storm深度分析及其正式版本思考

Storm发展到现在已经有了5个年头,从刚开始惊艳四方,到现在逐渐被新兴框架(Flink.Spark Streaming)挑战.Storm本身也在不断的发展,Twitter对其不断的探索,且深一步的开发了Heron框架.社区也在憋了5年后发布了第一个正式版本. Storm内部机制及探索 内部机制 Storm写了一层调度系统,Nimbus作为调度的Master(类似ResourceManager),Supervisor作为工作机器上的监控进程(类似NodeMonitor),Worker作为真正的工

zookeeper典型应用场景之一:master选举

对于zookeeper这种东西,仅仅知道怎么安装是远远不够的,至少要对其几个典型的应用场景进行了解,才能比较全面的知道zk究竟能干啥,怎么玩儿,以后的日子里才能知道这货如何能为我所用.于是,有了如下的学习: 我们知道zookeeper可以用于搭建高可用服务框架,主要先看以下几个应用场景:1. master的选举基本思路和编码实现2. 数据的发布和订阅3. 软负载均衡4. 分布式队列5. 分布式锁6. 命名服务 目前zookeeper常用的开发包有zkclient跟curator,后者更为方便,日

zookeeper选举机制

在上一篇文章中我们大致浏览了zookeeper的启动过程,并且提到在Zookeeper的启动过程中leader选举是非常重要而且最复杂的一个环节.那么什么是leader选举呢?zookeeper为什么需要leader选举呢?zookeeper的leader选举的过程又是什么样子的?本文的目的就是解决这三个问题. 首先我们来看看什么是leader选举.其实这个很好理解,leader选举就像总统选举一样,每人一票,获得多数票的人就当选为总统了.在zookeeper集群中也是一样,每个节点都会投票,如

zookeeper master 选举

原文地址: http://www.cnblogs.com/nevermorewang/p/5611807.html 选主原理介绍:zookeeper的节点有两种类型,持久节点跟临时节点.临时节点有个特性,就是如果注册这个节点的机器失去连接(通常是宕机),那么这个节点会被zookeeper删除.选主过程就是利用这个特性,在服务器启动的时候,去zookeeper特定的一个目录下注册一个临时节点(这个节点作为master,谁注册了这个节点谁就是master),注册的时候,如果发现该节点已经存在,则说明

视频直播的发展趋势分析

视频直播的分析与发展 在讲视频直播之前,先讲一讲直播.直播是怎么来的呢?从传播消息的角度上来说,视频和文字.图片.音乐一样都是传播消息的手段,古时以文字传播消息,之后出现了图片和音乐,再之后视频开始流行.出现这种演变的原因是什么呢?我想主要是由于读者的需求日益提高和传播技术的不断发展.读者不满足于当前的文字阅读,由此出现了图片与音乐,到后来图片与音乐也无法满足日益增长的需求,则出现了视频.视频具有文字.图片.音乐不具有的优势:传递的信息多,更让人有代入感,给观众更综合的体验.虽然视频有着无可比拟

Namenode HA原理详解

社区hadoop2.2.0 release版本开始支持NameNode的HA,本文将详细描述NameNode HA内部的设计与实现. 原文见 http://xiguada.org/namenode-ha-principle/ 为什么要Namenode HA? 1.NameNode High Availability即高可用. 2.NameNode 很重要,挂掉会导致存储停止服务,无法进行数据的读写,基于此NameNode的计算(MR,Hive等)也无法完成. Namenode HA 如何实现,关

zookeeper搭建和脚本编写

hadoop: hdfs:分布式存储 MR: 分布式计算 hdfs: ========================= 1.namenode(元数据).datanode(真实数据).2nn(检查点) 2.hadoop-daemon.sh start namenode //启动本机进程 hadoop-daemons.sh start datanode //启动slave机器进程 3.namenode:编辑日志(hdfs oev)和镜像文件(oiv) 编辑日志:hdfs对文件的写操作,读取文件不需

直播在没落!“偏科”的映客能借IPO突围吗?

据<中国经营报>报道,由于即将在港股上市,直播平台映客把今年"樱花女生"的结束晚会放在靠近香港的广州举办,并首次采用了对外售票的方式,来的歌星不少,现场几乎座无虚席. 对于映客来说,上市或许是它的一个执念.由于注入A股公司宣亚国际失败后3个月,映客在今年重启上市计划. 3月26日晚间,映客正式向香港交易所递交招股书,根据招股书显示,较2016年,映客2017年收入.月活数量.付费用户数量均出现下滑. 一个业界反复在谈论的槽点是,映客这种"锲而不舍",是要

Hadoop NameNode HA模式的搭建以及原理

搭建HA(高可用)模式的集群参见(http://blog.cheyo.net/92.html) 转自:http://www.it165.net/admin/html/201407/3465.html 社区hadoop2.2.0 release版本开始支持NameNode的HA,本文将详细描述NameNode HA内部的设计与实现. 为什么要Namenode HA? 1. NameNode High Availability即高可用. 2. NameNode 很重要,挂掉会导致存储停止服务,无法进