Zookeeper 源码分析-启动
博客分类:
本文主要介绍了zookeeper启动的过程
运行zkServer.sh start命令可以启动zookeeper。入口的main函数在类中QuorumPeerMain。
main函数主要调用了runFromConfig函数,创建了QuorumPeer对象,并且调用了start函数,从而启动了zookeeper。
Java代码
- public class QuorumPeerMain {
- protected QuorumPeer quorumPeer;
- /**
- * To start the replicated server specify the configuration file name on
- * the command line.
- * @param args path to the configfile
- */
- public static void main(String[] args) {
- QuorumPeerMain main = new QuorumPeerMain();
- main.initializeAndRun(args);
- }
- protected void initializeAndRun(String[] args)
- throws ConfigException, IOException
- {
- runFromConfig(config);
- }
- public void runFromConfig(QuorumPeerConfig config) throws IOException {
- LOG.info("Starting quorum peer");
- try {
- NIOServerCnxn.Factory cnxnFactory =
- new NIOServerCnxn.Factory(config.getClientPortAddress(),
- config.getMaxClientCnxns());
- quorumPeer = new QuorumPeer();
- quorumPeer.setClientPortAddress(config.getClientPortAddress());
- quorumPeer.setTxnFactory(new FileTxnSnapLog(
- new File(config.getDataLogDir()),
- new File(config.getDataDir())));
- quorumPeer.setQuorumPeers(config.getServers());
- quorumPeer.setElectionType(config.getElectionAlg());
- quorumPeer.setMyid(config.getServerId());
- quorumPeer.setTickTime(config.getTickTime());
- quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
- quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
- quorumPeer.setInitLimit(config.getInitLimit());
- quorumPeer.setSyncLimit(config.getSyncLimit());
- quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
- quorumPeer.setCnxnFactory(cnxnFactory);
- quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
- quorumPeer.setLearnerType(config.getPeerType());
- quorumPeer.start();
- quorumPeer.join();
- } catch (InterruptedException e) {
- // warn, but generally this is ok
- LOG.warn("Quorum Peer interrupted", e);
- }
- }
- }
在QuorumPeer的start函数中,先调用了loadDataBase方法用于恢复数据。启动与client交互的线程,并
Java代码
- @Override
- public synchronized void start() {
- try {
- zkDb.loadDataBase();
- } catch(IOException ie) {
- LOG.fatal("Unable to load database on disk", ie);
- throw new RuntimeException("Unable to run quorum server ", ie);
- }
- cnxnFactory.start(); //用于处理与client的交互
- startLeaderElection();//开始选举算法
- super.start();
- }
调用loadDatabase从磁盘加载数据到内存
Java代码
- public long loadDataBase() throws IOException {
- PlayBackListener listener=new PlayBackListener(){
- public void onTxnLoaded(TxnHeader hdr,Record txn){
- Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
- null, null);
- r.txn = txn;
- r.hdr = hdr;
- r.zxid = hdr.getZxid();
- addCommittedProposal(r);
- }
- };
- long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
- initialized = true;
- return zxid;
- }
调用QuorumPeer的run函数,按照peer的state做不同的处理
Java代码
- @Override
- public void run() {
- setName("QuorumPeer:" + cnxnFactory.getLocalAddress());
- try {
- /*
- * Main loop
- */
- while (running) {
- switch (getPeerState()) {
- case LOOKING:
- try {
- LOG.info("LOOKING");
- setCurrentVote(makeLEStrategy().lookForLeader());
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e);
- setPeerState(ServerState.LOOKING);
- }
- break;
- case OBSERVING:
- try {
- LOG.info("OBSERVING");
- setObserver(makeObserver(logFactory));
- observer.observeLeader();
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e );
- } finally {
- observer.shutdown();
- setObserver(null);
- setPeerState(ServerState.LOOKING);
- }
- break;
- case FOLLOWING:
- try {
- LOG.info("FOLLOWING");
- setFollower(makeFollower(logFactory));
- follower.followLeader();
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e);
- } finally {
- follower.shutdown();
- setFollower(null);
- setPeerState(ServerState.LOOKING);
- }
- break;
- case LEADING:
- LOG.info("LEADING");
- try {
- setLeader(makeLeader(logFactory));
- leader.lead();
- setLeader(null);
- } catch (Exception e) {
- LOG.warn("Unexpected exception",e);
- } finally {
- if (leader != null) {
- leader.shutdown("Forcing shutdown");
- setLeader(null);
- }
- setPeerState(ServerState.LOOKING);
- }
- break;
- }
- }
- }
- }
- http://blog.csdn.net/xhh198781/article/details/10949697
时间: 2024-10-25 09:27:20