zookeeper主节点竞争类

import com.alibaba.fastjson.JSON;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.leader.HIKLeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
 * 主节点竞争工具类
 *
 * @author
 * @version V1.0
 * @modificationHistory=========================逻辑或功能性重大变更记录
 * @modify by user: $author$ $date$
 * @modify by reason:{方法名}:{原因}
 */
public class LeaderLatchClient implements Closeable {
    private static final Logger LOGGER = LoggerFactory.getLogger(LeaderLatchClient.class);

    private static Executor executor = Executors.newCachedThreadPool();
    public static final String LATCH_PATH_PARENT = "/leaderlatch";//默认
    private static final String LEADER_INFO_PATH = "/LEADER_INFO";//节点信息路径

    private final HIKLeaderLatch leaderLatch;
    private NodeInfo leaderInfo;//主节点信息
    private final NodeInfo localInfo;//本地节点信息
    private final CuratorFramework client;//zookeeper连接客户端
    private NodeCache nodeCache;//节点缓存
    private final String leaderInfoPath;//主节点信息缓存路径
    private String latchPath;//主节点竞争路径
    private boolean cacheLeaderInd = false;//是否开启缓存主节点信息功能,默认不开启

    public LeaderLatchClient(CuratorFramework client, NodeInfo localInfo, String latchNode, LeaderLatchListener... latchListeners) {
        String errMsg;
        if (client == null || !client.getState().equals(CuratorFrameworkState.STARTED)) {
            errMsg = "Zookeeper Client is null or not start.";
            LOGGER.error(errMsg);
            throw new IllegalArgumentException(errMsg);
        }
        latchPath = genLeaderLatchPath(latchNode);
        if (localInfo == null) {
            LOGGER.warn("Local node info is null,cache leader is off.");
            this.localInfo = null;
            this.leaderInfoPath = null;
        } else {
            LOGGER.info("Local node info is not null,cache leader is on.");
            this.localInfo = localInfo;
            leaderInfoPath = genLeaderInfoPath(latchPath);
            cacheLeaderInd = true;
        }
        this.client = client;
        leaderLatch = new HIKLeaderLatch(client, latchPath);

        for (LeaderLatchListener latchListener : latchListeners) {
            leaderLatch.addListener(latchListener, executor);
        }

        //初始化主节点信息缓存
        initLeaderInfoCache();
    }

    /**
     * 开始竞争
     *
     * @throws Exception
     */
    public void start() throws Exception {
        leaderLatch.start();
    }

    /*
     * (non-Javadoc)
     *
     * @see java.io.Closeable#close()
     */
    @Override
    public void close() throws IOException {
        leaderLatch.close();
    }

    /**
     * 判断本节点是否为主节点
     *
     * @return
     */
    public boolean isLeader() {
        return leaderLatch.hasLeadership();
    }

    /**
     * 获取主节点信息
     *
     * @return
     */
    public NodeInfo getLeaderInfo() {
        if (cacheLeaderInd) {
            return leaderInfo;
        } else {
            String errMsg = "This client cache leader info is off can not get leader info.";
            LOGGER.error(errMsg);
            throw new UnsupportedOperationException(errMsg);
        }
    }

    /**
     * 初始化主节点信息缓存
     */
    private void initLeaderInfoCache() {
        if (cacheLeaderInd) {
            //添加主节点信息更新监听器
            leaderLatch.addListener(new LeaderLatchListener() {
                @Override
                public void isLeader() {
                    LOGGER.info("node [{}] now is leader node,update node info to zookeeper path [{}]", localInfo.toString(), leaderInfoPath);
                    //创建主节点信息缓存路径
                    try {
                        if (client.checkExists().forPath(leaderInfoPath) == null) {
                            client.create().forPath(leaderInfoPath, JSON.toJSONString(localInfo).getBytes());
                        } else {
                            client.setData().forPath(leaderInfoPath, JSON.toJSONString(localInfo).getBytes());
                        }
                    } catch (Exception e) {
                        LOGGER.error("Update Leader info error.", e);
                    }
                }

                @Override
                public void notLeader() {
                }
            }, executor);

            //设置主节点信息缓存
            nodeCache = new NodeCache(client, leaderInfoPath);
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    String data = new String(nodeCache.getCurrentData().getData());
                    LOGGER.info("watch leader info change, now data is [{}]", data);
                    leaderInfo = JSON.parseObject(data, NodeInfo.class);
                }
            }, executor);
            try {
                nodeCache.start();
            } catch (Exception e) {
                LOGGER.error("Leader info change cache start error.");
            }
        }
    }

    /**
     * 生成主节点信息zk缓存路径
     *
     * @param latchPath
     * @return
     */
    private String genLeaderInfoPath(String latchPath) {
        if (latchPath.endsWith("/")) {
            latchPath = latchPath.substring(0, latchPath.length() - 1);
        }
        return latchPath + LEADER_INFO_PATH;
    }

    /**
     * 生成主节点竞争路径
     *
     * @param latchNode
     * @return
     */
    private String genLeaderLatchPath(String latchNode) {
        if (!latchNode.startsWith("/")) {
            latchNode = "/" + latchNode;
        }
        return LATCH_PATH_PARENT + latchNode;
    }

    /**
     * 主节点信息
     */
    public static class NodeInfo implements Serializable {
        private String nodeHost;
        private int nodePort;

        public NodeInfo() {
        }

        public NodeInfo(String nodeHost, int nodePort) {
            this.nodeHost = nodeHost;
            this.nodePort = nodePort;
        }

        public String getNodeHost() {
            return nodeHost;
        }

        public void setNodeHost(String nodeHost) {
            this.nodeHost = nodeHost;
        }

        public int getNodePort() {
            return nodePort;
        }

        public void setNodePort(int nodePort) {
            this.nodePort = nodePort;
        }

        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }
}
时间: 2024-10-10 16:16:32

zookeeper主节点竞争类的相关文章

Zookeeper开源客户端框架Curator简介

Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处理很多事情, 于是在它的基础上包装了一下, 提供了一套更好用的客户端框架. Netflix在用ZooKeeper的过程中遇到的问题, 我们也遇到了, 所以开始研究一下, 首先从他在github上的源码, wiki文档以及Netflix的技术blog入手. 看完官方的文档之后, 发现Curator主要解决了三类

Zookeeper 扫盲

Zookeeper 扫盲 :disappointed_relieved: 配置文件详解: tickTime:基本事件单元,以毫秒为单位,这个时间作为 Zookeeper 服务器之间或客户端之间维持心跳的时间间隔 dataDir:存储内存中数据库快照的位置,顾名思义就是 Zookeeper 保存数据的目录,默认情况下,Zookeeper 将写数据的日志文件也保存到这个目录里 clientPort:这个端口就是客户端连接 Zookeeper 服务器的端口,Zookeeper 会监听这个端口,接受客户

客户端操作zookeeper服务代码示例

本文主要贴出通过zookeeper的客户端类访问zookeeper的示例,以及其它第三方更高层次的封装的客户端使用. 1.通过org.apache.zookeeper.ZooKeeper来操作zookeeper服务 有关zookeeper服务的部署参见文:http://aiilive.blog.51cto.com/1925756/1684145 下文将有代码示例展示通过编码方式在应用中启动zookeeper服务. ZooKeeper类对zookeeper服务的简单操作示例代码如下: packag

Paxos算法之旅(四)zookeeper代码解析

ZooKeeper是近期比较热门的一个类Paxos实现.也是一个逐渐得到广泛应用的开源的分布式锁服务实现.被认为是Chubby的开源版,虽然具体实现有很多差异.ZooKeeper概要的介绍可以看官方文档:http://hadoop.apache.org/zookeeper 这里我们重点来看下它的内部实现. ZooKeeper集群中的每个server都要知道其他成员,通过在配置文件zoo.cfg中作如下配置实现: tickTime=2000 dataDir=/var/zookeeper/ clie

.Net客户端监听ZooKeeper节点数据变化

一个很简单的例子,用途是监听zookeeper中某个节点数据的变化,具体请参见代码中的注释 using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using ZooKeeperNet; namespace ZooKeeperDemo { /// <summary> ///

【HBase】zookeeper在HBase中的应用

转自:http://support.huawei.com/ecommunity/bbs/10242721.html Zookeeper在HBase中的应用 HBase部署相对是一个较大的动作,其依赖于zookeeper cluster,hadoop HDFS. Zookeeper作用在于: 1.hbase regionserver 向zookeeper注册,提供hbase regionserver状态信息(是否在线). 2.hmaster启动时候会将hbase系统表-ROOT- 加载到 zook

zookeeper 大量连接断开重连原因排查

问题现象 最后发现线上的zookeeper的日志zookeeper.out 文件居然有6G,后来设置下日志为滚动输出,参考: http://blog.csdn.net/hengyunabc/article/details/19006911 但是改了之后,发现一天的日志量就是100多M,滚动日志一天就被冲掉了,这个不科学. 再仔细查看下日志里的内容,发现有很多连接建立好,马上又断开: 2014-11-24 15:38:33,348 [myid:3] - INFO [NIOServerCxn.Fac

02.ZooKeeper的Java客户端使用

1.ZooKeeper常用客户端比较 1.ZooKeeper常用客户端 zookeeper的常用客户端有3种,分别是:zookeeper原生的.Apache Curator.开源的zkclient,下面分别对介绍它们: zookeeper自带的客户端是官方提供的,比较底层.使用起来写代码麻烦.不够直接. Apache Curator是Apache的开源项目,封装了zookeeper自带的客户端,使用相对简便,易于使用. zkclient是另一个开源的ZooKeeper客户端,其地址:https:

基于zookeeper的MySQL主主负载均衡的简单实现

1.先上原理图 2.说明 两个mysql采用主主同步的方式进行部署. 在安装mysql的服务器上安装客户端(目前是这么做,以后想在zookeeper扩展集成),客户端实时监控mysql应用的可用性,可用时想zookeepercreateNode,当网络不可用或者mysql应用不可用时,建立的znode消失. 在客户端,通过改造proxool数据库连接池的方式,在建立连接之前,从zookeeper中去取真实的数据库URL,如果有多个URL,即有多个服务时,采用随机算法去拿连接(以后准备扩展权重).