最近正在研究zookeeper,一些心得记录一下,如有错误,还请大神指正。
zookeeper下载地址:http://zookeeper.apache.org/releases.html,百度一下就能找到,不过还是在这里列一下。
我认为学习一个东西,首先要理出一个头绪,否则感觉无从下手,这里我从启动开始研究,即从zkSever.sh入手。
if [ "x$JMXDISABLE" = "x" ] then echo "JMX enabled by default" >&2 # for some reason these two options are necessary on jdk6 on Ubuntu # accord to the docs they are not necessary, but otw jconsole cannot # do a local attach ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain" else echo "JMX disabled by user request" >&2 ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain" fi
从zkSever.sh可以看出,启动入口在QuorumPeerMain中,源码如下:
// 入口函数 public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); //...1、启动初始化 main.initializeAndRun(args); // ... } protected void initializeAndRun(String[] args) throws QuorumPeerConfig.ConfigException, IOException { // 2、加载配置文件 QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { // 解析配置文件 config.parse(args[0]); } if ((args.length == 1) && (config.servers.size() > 0)) { // 配置文件的信息加载至QuorumPeer runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running in standalone mode"); ZooKeeperServerMain.main(args); } } public void runFromConfig(QuorumPeerConfig config) throws IOException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { NIOServerCnxn.Factory cnxnFactory = new NIOServerCnxn.Factory(config.getClientPortAddress(), config.getMaxClientCnxns()); // 3、启动QuorumPeer this.quorumPeer = new QuorumPeer(); this.quorumPeer.setClientPortAddress(config.getClientPortAddress()); //...加载各种配置信息 this.quorumPeer.start(); this.quorumPeer.join(); } catch (InterruptedException e) { LOG.warn("Quorum Peer interrupted", e); } }
可以看出,配置文件的解析由QuorumPeerConfig 类完成,其部分源码如下:
public void parse(String path) throws QuorumPeerConfig.ConfigException { File configFile = new File(path); LOG.info("Reading configuration from: " + configFile); try { if (!configFile.exists()) { throw new IllegalArgumentException(configFile.toString() + " file is missing"); } // 将配置信息加载如property文件 Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { cfg.load(in); } finally { in.close(); } parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } } public void parseProperties(Properties zkProp) throws IOException, QuorumPeerConfig.ConfigException { int clientPort = 0; String clientPortAddress = null;// 循环解析配置文件 for (Map.Entry entry : zkProp.entrySet()) { String key = entry.getKey().toString().trim(); String value = entry.getValue().toString().trim(); if (key.equals("dataDir")) { this.dataDir = value; } else if (key.equals("dataLogDir")) { this.dataLogDir = value; } else if (key.equals("clientPort")) { // 客户端连接的端口号 clientPort = Integer.parseInt(value); } else if (key.equals("clientPortAddress")) { clientPortAddress = value.trim(); } else if (key.equals("tickTime")) { // 心跳时间 this.tickTime = Integer.parseInt(value); } else if (key.equals("maxClientCnxns")) { this.maxClientCnxns = Integer.parseInt(value); } else if (key.equals("minSessionTimeout")) { this.minSessionTimeout = Integer.parseInt(value); } else if (key.equals("maxSessionTimeout")) { this.maxSessionTimeout = Integer.parseInt(value); } else if (key.equals("initLimit")) { this.initLimit = Integer.parseInt(value); } else if (key.equals("syncLimit")) { this.syncLimit = Integer.parseInt(value); } else if (key.equals("electionAlg")) { // 选举算法的类型,默认算法为FastLeaderElection this.electionAlg = Integer.parseInt(value); } else if (key.equals("peerType")) { if (value.toLowerCase().equals("observer")) this.peerType = QuorumPeer.LearnerType.OBSERVER; else if (value.toLowerCase().equals("participant")) { this.peerType = QuorumPeer.LearnerType.PARTICIPANT; } else throw new ConfigException("Unrecognised peertype: " + value); } //...
回到QuorumPeerMain类的runFromConfig方法。此方法中,会将配置信息加载至QuorumPeer,并调用其start方法:
public synchronized void start() { try { this.zkDb.loadDataBase(); } catch (IOException ie) { LOG.fatal("Unable to load database on disk", ie); throw new RuntimeException("Unable to run quorum server ", ie); } this.cnxnFactory.start(); startLeaderElection(); super.start(); }
在start方法中,会现价在硬盘中的数据,
this.zkDb.loadDataBase();即ZKDatabase中
public long loadDataBase() throws IOException { FileTxnSnapLog.PlayBackListener listener = new FileTxnSnapLog.PlayBackListener() { public void onTxnLoaded(TxnHeader hdr, Record txn) { Request r = new Request(null, 0L, hdr.getCxid(), hdr.getType(), null, null); r.txn = txn; r.hdr = hdr; r.zxid = hdr.getZxid(); ZKDatabase.this.addCommittedProposal(r); } }; long zxid = this.snapLog.restore(this.dataTree, this.sessionsWithTimeouts, listener); this.initialized = true; return zxid; }
然后开确定选类型,startLeaderElection
public synchronized void startLeaderElection() { this.currentVote = new Vote(this.myid, getLastLoggedZxid()); for (QuorumServer p : getView().values()) { if (p.id == this.myid) { this.myQuorumAddr = p.addr; break; } } if (this.myQuorumAddr == null) { throw new RuntimeException("My id " + this.myid + " not in the peer list"); } if (this.electionType == 0) { try { this.udpSocket = new DatagramSocket(this.myQuorumAddr.getPort()); this.responder = new ResponderThread(); this.responder.start(); } catch (SocketException e) { throw new RuntimeException(e); } } this.electionAlg = createElectionAlgorithm(this.electionType);//加载选举类型}
然后启动run方法
时间: 2024-10-16 19:06:28