Zookeeper 源码分析-启动

Zookeeper 源码分析-启动

博客分类:

本文主要介绍了zookeeper启动的过程

运行zkServer.sh start命令可以启动zookeeper。入口的main函数在类中QuorumPeerMain。

main函数主要调用了runFromConfig函数,创建了QuorumPeer对象,并且调用了start函数,从而启动了zookeeper。

Java代码  

  1. public class QuorumPeerMain {
  2. protected QuorumPeer quorumPeer;
  3. /**
  4. * To start the replicated server specify the configuration file name on
  5. * the command line.
  6. * @param args path to the configfile
  7. */
  8. public static void main(String[] args) {
  9. QuorumPeerMain main = new QuorumPeerMain();
  10. main.initializeAndRun(args);
  11. }
  12. protected void initializeAndRun(String[] args)
  13. throws ConfigException, IOException
  14. {
  15. runFromConfig(config);
  16. }
  17. public void runFromConfig(QuorumPeerConfig config) throws IOException {
  18. LOG.info("Starting quorum peer");
  19. try {
  20. NIOServerCnxn.Factory cnxnFactory =
  21. new NIOServerCnxn.Factory(config.getClientPortAddress(),
  22. config.getMaxClientCnxns());
  23. quorumPeer = new QuorumPeer();
  24. quorumPeer.setClientPortAddress(config.getClientPortAddress());
  25. quorumPeer.setTxnFactory(new FileTxnSnapLog(
  26. new File(config.getDataLogDir()),
  27. new File(config.getDataDir())));
  28. quorumPeer.setQuorumPeers(config.getServers());
  29. quorumPeer.setElectionType(config.getElectionAlg());
  30. quorumPeer.setMyid(config.getServerId());
  31. quorumPeer.setTickTime(config.getTickTime());
  32. quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
  33. quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
  34. quorumPeer.setInitLimit(config.getInitLimit());
  35. quorumPeer.setSyncLimit(config.getSyncLimit());
  36. quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
  37. quorumPeer.setCnxnFactory(cnxnFactory);
  38. quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
  39. quorumPeer.setLearnerType(config.getPeerType());
  40. quorumPeer.start();
  41. quorumPeer.join();
  42. } catch (InterruptedException e) {
  43. // warn, but generally this is ok
  44. LOG.warn("Quorum Peer interrupted", e);
  45. }
  46. }
  47. }

在QuorumPeer的start函数中,先调用了loadDataBase方法用于恢复数据。启动与client交互的线程,并

Java代码  

  1. @Override
  2. public synchronized void start() {
  3. try {
  4. zkDb.loadDataBase();
  5. } catch(IOException ie) {
  6. LOG.fatal("Unable to load database on disk", ie);
  7. throw new RuntimeException("Unable to run quorum server ", ie);
  8. }
  9. cnxnFactory.start(); //用于处理与client的交互
  10. startLeaderElection();//开始选举算法
  11. super.start();
  12. }

调用loadDatabase从磁盘加载数据到内存

Java代码  

  1. public long loadDataBase() throws IOException {
  2. PlayBackListener listener=new PlayBackListener(){
  3. public void onTxnLoaded(TxnHeader hdr,Record txn){
  4. Request r = new Request(null, 0, hdr.getCxid(),hdr.getType(),
  5. null, null);
  6. r.txn = txn;
  7. r.hdr = hdr;
  8. r.zxid = hdr.getZxid();
  9. addCommittedProposal(r);
  10. }
  11. };
  12. long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
  13. initialized = true;
  14. return zxid;
  15. }

调用QuorumPeer的run函数,按照peer的state做不同的处理

Java代码  

  1. @Override
  2. public void run() {
  3. setName("QuorumPeer:" + cnxnFactory.getLocalAddress());
  4. try {
  5. /*
  6. * Main loop
  7. */
  8. while (running) {
  9. switch (getPeerState()) {
  10. case LOOKING:
  11. try {
  12. LOG.info("LOOKING");
  13. setCurrentVote(makeLEStrategy().lookForLeader());
  14. } catch (Exception e) {
  15. LOG.warn("Unexpected exception",e);
  16. setPeerState(ServerState.LOOKING);
  17. }
  18. break;
  19. case OBSERVING:
  20. try {
  21. LOG.info("OBSERVING");
  22. setObserver(makeObserver(logFactory));
  23. observer.observeLeader();
  24. } catch (Exception e) {
  25. LOG.warn("Unexpected exception",e );
  26. } finally {
  27. observer.shutdown();
  28. setObserver(null);
  29. setPeerState(ServerState.LOOKING);
  30. }
  31. break;
  32. case FOLLOWING:
  33. try {
  34. LOG.info("FOLLOWING");
  35. setFollower(makeFollower(logFactory));
  36. follower.followLeader();
  37. } catch (Exception e) {
  38. LOG.warn("Unexpected exception",e);
  39. } finally {
  40. follower.shutdown();
  41. setFollower(null);
  42. setPeerState(ServerState.LOOKING);
  43. }
  44. break;
  45. case LEADING:
  46. LOG.info("LEADING");
  47. try {
  48. setLeader(makeLeader(logFactory));
  49. leader.lead();
  50. setLeader(null);
  51. } catch (Exception e) {
  52. LOG.warn("Unexpected exception",e);
  53. } finally {
  54. if (leader != null) {
  55. leader.shutdown("Forcing shutdown");
  56. setLeader(null);
  57. }
  58. setPeerState(ServerState.LOOKING);
  59. }
  60. break;
  61. }
  62. }
  63. }
  64. }
  65. http://blog.csdn.net/xhh198781/article/details/10949697
时间: 2024-10-25 09:27:20

Zookeeper 源码分析-启动的相关文章

zookeeper源码分析之五服务端(集群leader)处理请求流程

leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor 具体情况可以参看代码: @Override protected v

zookeeper源码分析之一服务端处理请求流程

上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析各自一下消息处理过程: 前文可以看到在 1.在单机情况下NettyServerCnxnFactory中启动ZookeeperServer来处理消息: public synchronized void startup() { if (sessionTracker == null) { createSe

zookeeper源码分析之一客户端发送请求流程

znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等. 知识准备: zookeeper定义的状态有: Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),Sasl

TOMCAT源码分析(启动框架)

建议: 毕竟TOMCAT的框架还是比较复杂的, 单是从文字上理解, 是不那么容易掌握TOMCAT的框架的. 所以得实践.实践.再实践. 建议下载一份TOMCAT的源码, 调试通过, 然后单步跟踪其启动过程. 如果有不明白的地方, 再来查阅本文, 看是否能得到帮助. 我相信这样效果以及学习速度都会好很多! 1. Tomcat的整体框架结构 Tomcat的基本框架, 分为4个层次. Top Level Elements: Server Service Connector HTTP AJP Conta

基于TCP网络通信的自动升级程序源码分析-启动升级文件下载程序

升级程序启动后,首先会连接服务器 private void Connect() { try { int port = int.Parse(System.Configuration.ConfigurationManager.AppSettings["Port"]); connnectionInfo = new ConnectionInfo(IPAddress, port); connection = TCPConnection.GetConnection(connnectionInfo)

Symfoy2源码分析——启动过程2

上一篇分析Symfony2框架源码,探究Symfony2如何完成一个请求的前半部分,前半部分可以理解为Symfony2框架为处理请求做准备工作,包括container生成.缓存.bundls初始化等一些列准备工作(Symfoy2源码分析——启动过程1).而这一篇讲的是Symfony2如何根据请求的数据生成Response对象,向客户端返回响应数据. 在分析前需要了解Symfony2的事件驱动机制:Symfony2事件驱动. 言归正传,Symfony2请求的工作流程其实是Symfony2内核的事件

storm操作zookeeper源码分析-cluster.clj

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中).backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState.clojure中的protocol可以看成java中的接口,封装了一组方法.ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协

Tomcat7.0源码分析——启动与停止服务

前言 熟悉Tomcat的工程师们,肯定都知道Tomcat是如何启动与停止的.对于startup.sh.startup.bat.shutdown.sh.shutdown.bat等脚本或者批处理命令,大家一定知道改如何使用它,但是它们究竟是如何实现的,尤其是shutdown.sh脚本(或者shutdown.bat)究竟是如何和Tomcat进程通信的呢?本文将通过对Tomcat7.0的源码阅读,深入剖析这一过程. 由于在生产环境中,Tomcat一般部署在Linux系统下,所以本文将以startup.s

Tomcat7.0源码分析——启动与停止服务原理

前言 熟悉Tomcat的工程师们,肯定都知道Tomcat是如何启动与停止的.对于startup.sh.startup.bat.shutdown.sh.shutdown.bat等脚本或者批处理命令,大家一定知道改如何使用它,但是它们究竟是如何实现的,尤其是shutdown.sh脚本(或者shutdown.bat)究竟是如何和Tomcat进程通信的呢?本文将通过对Tomcat7.0的源码阅读,深入剖析这一过程. 由于在生产环境中,Tomcat一般部署在Linux系统下,所以本文将以startup.s