ZooKeeper源码分析:Log和Snapshot持久化(SyncRequestProcessor类)

事务日志的持久化是在SyncRequestProcessor类中实现,并会按照一定的规则滚动日志(关闭当前文件,创建一个新文件),以及生成新的Snapshot。在持久化过程中,使用组提交(Group Commits)来优化磁盘io 操作。组提交是指将多个Request对象的事务作为一次写附加到磁盘上。使用这种方式可以在持久化多个事务的时候,只使用一次磁盘寻道(Disk Seek)的开销。Request对象只有在其中事务同步到磁盘后,才会传递到下一个处理器。

SyncRequestProcessor被用于下面三种不同的情景中:

  • Leader - 同步请求到磁盘,并且转发这个请求到AckRequestProcessor。该处理器发送Ack消息给Leader自己。
  • Follower - 同步请求到磁盘,并转发请求到SendAckRequestProcessor。该处理器发送确认数据包给Leader。SendAckRequestProcessor是flushable, 允许我们强制将数据包推送到Leader。
  • Observer - 同步已经提交的请求到磁盘(从INFORM数据包中接收)。它不会发送确认数据包给Leader。所以nextProcessor是null。在observer中, 和一般的txnlog语义不同,因为它仅包含已经提交的txn。

在SyncRequestProcessor中有两个关键队列:

  • queuedRequest队列:存放从传入该处理器的Request对象。当调用该处理器的processRequest方法,会将Request对象放入到queuedRequest队列;
  • toFlush队列:存放已经附加到日志文件,但还没有Flush的Request对象。

SyncRequestProcessor的run方法循环读取queuedRequests队列中的Request对象并进行持久化。

流程图如下:

如果toFlush队列为空,则调用queuedRequest队列的阻塞方法take();如果toFlush队列不为空,则调用queuedRequest队列的非阻塞方法poll()。如果poll()方法返回null,则会立即将toFlush队列中所有Request对象中事务Flush到磁盘,并将Request对象传入到下一个处理器。这样可以避免增加请求处理的延时。如果queuedRequest.poll()方法返回不为Null或者queuedRequest.take()方法返回, 则将返回的Request对象si中的事务附加到事务日志文件中,并放入toFlush队列中。如果toFlush队列大小大于1000,则将队列中所有Request对象中事务Flush到磁盘,并将Request对象传入下一个处理器。这是可以避免在有大量请求的时候增加请求处理的延时。

Request对象附加到事务日志之后,会检查日志记录数logCount是否大于(snapCount / 2 + randRoll)。如果大于则滚动日志,并启动生成新Snapshot的线程。其中randRoll是一个随机数。这个随机数的使用可以避免Zookeeper集群里的所有机器同时构建Snapshot。

SyncRequestProcessor.run方法如下:

public void run() {
    try {
        int logCount = 0;

       //这个随机数randRoll的使用可以避免Zookeeper集群里的所有机器同时构建Snapshot
        setRandRoll(r.nextInt( snapCount/2));
        while (true ) {
            Request si = null;
            //如果toFlush为空,则调用队列queuedRequests的阻塞方法take()
            if (toFlush .isEmpty()) {
                si = queuedRequests.take();
            }
                            //如果toFlush不为空,则调用队列queuedRequests的非阻塞方法poll()
            else {
                si = queuedRequests.poll();
                //如果si为null, 说明queuedRequests为空,则调用flush()方法
                if (si == null) {
                    flush( toFlush);
                    continue;
                }
            }
            //如果si是一个poison pill, 则退出循环
            if (si == requestOfDeath ) {
                break;
            }
            if (si != null) {
                // track the number of records written to the log
                //将record的操作记到日志中
                if (zks .getZKDatabase().append(si)) {
                    logCount++;
                    if (logCount > (snapCount / 2 + randRoll)) {
                        randRoll = r .nextInt(snapCount/2);
                        //滚动事务日志
                        zks.getZKDatabase().rollLog();
                        //构建snapshot
                        if (snapInProcess != null && snapInProcess.isAlive()) {
                            LOG.warn("Too busy to snap, skipping" );
                        } else {
                            //生成snapshot线程
                            snapInProcess = new Thread("Snapshot Thread") {
                                    public void run() {
                                        try {
                                            zks.takeSnapshot();
                                        } catch(Exception e) {
                                            LOG.warn("Unexpected exception", e);
                                        }
                                    }
                                };
                            //启动snapInProcess
                            snapInProcess.start();
                        }
                        logCount = 0;
                    }
                }
                                     
                else if (toFlush .isEmpty()) {
                    // optimization for read heavy workloads
                    //如果这是一个read, 并且没有pending的flushes(writes), 那么直接传递到下一个处理器
                    if (nextProcessor != null) {
                        nextProcessor.processRequest(si);
                        if (nextProcessor instanceof Flushable) {
                            ((Flushable) nextProcessor).flush();
                        }
                    }
                    continue;
                }
                toFlush.add(si);
                //如果toFlush的大小大于1000, 则flush
                if (toFlush .size() > 1000) {
                    flush( toFlush);
                }
            }
        }
    } catch (Throwable t) {
        LOG.error("Severe unrecoverable error, exiting" , t);
        running = false ;
        System. exit(11);
    }
    LOG.info("SyncRequestProcessor exited!" );
}

转载请附上原博客地址:http://blog.csdn.net/jeff_fangji/article/details/44046997

时间: 2024-10-07 20:03:17

ZooKeeper源码分析:Log和Snapshot持久化(SyncRequestProcessor类)的相关文章

zookeeper源码分析之一服务端处理请求流程

上文: zookeeper源码分析之一服务端启动过程 中,我们介绍了zookeeper服务器的启动过程,其中单机是ZookeeperServer启动,集群使用QuorumPeer启动,那么这次我们分析各自一下消息处理过程: 前文可以看到在 1.在单机情况下NettyServerCnxnFactory中启动ZookeeperServer来处理消息: public synchronized void startup() { if (sessionTracker == null) { createSe

Zookeeper 源码分析-启动

Zookeeper 源码分析-启动 博客分类: Zookeeper 本文主要介绍了zookeeper启动的过程 运行zkServer.sh start命令可以启动zookeeper.入口的main函数在类中QuorumPeerMain. main函数主要调用了runFromConfig函数,创建了QuorumPeer对象,并且调用了start函数,从而启动了zookeeper. Java代码   public class QuorumPeerMain { protected QuorumPeer

zookeeper源码分析之五服务端(集群leader)处理请求流程

leader的实现类为LeaderZooKeeperServer,它间接继承自标准ZookeeperServer.它规定了请求到达leader时需要经历的路径: PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor 具体情况可以参看代码: @Override protected v

zookeeper源码分析之一客户端发送请求流程

znode 可以被监控,包括这个目录节点中存储的数据的修改,子节点目录的变化等,一旦变化可以通知设置监控的客户端,这个功能是zookeeper对于应用最重要的特性,通过这个特性可以实现的功能包括配置的集中管理,集群管理,分布式锁等等. 知识准备: zookeeper定义的状态有: Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),Sasl

Spring源码分析——BeanFactory体系之抽象类、类分析(二)

上一篇分析了BeanFactory体系的2个类,SimpleAliasRegistry和DefaultSingletonBeanRegistry——Spring源码分析——BeanFactory体系之抽象类.类分析(一),今天继续分析. 一.工厂Bean注册支持——FactoryBeanRegistrySupport 废话不多说,直接看我注释的源码: /* * Copyright 2002-2012 the original author or authors. * * Licensed und

Cordova Android源码分析系列二(CordovaWebView相关类分析)

本篇文章是Cordova Android源码分析系列文章的第二篇,主要分析CordovaWebView和CordovaWebViewClient类,通过分析代码可以知道Web网页加载的过程,错误出来,多线程处理等. CordovaWebView类分析 CordovaWebView类继承了Android WebView类,这是一个很自然的实现,共1000多行代码.包含了PluginManager pluginManager,BroadcastReceiver receiver,CordovaInt

storm操作zookeeper源码分析-cluster.clj

storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中).backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState.clojure中的protocol可以看成java中的接口,封装了一组方法.ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterState协

Spring源码分析——BeanFactory体系之抽象类、类分析(一)

上一篇介绍了BeanFactory体系的所有接口——Spring源码分析——BeanFactory体系之接口详细分析,本篇就接着介绍BeanFactory体系的抽象类和接口. 一.BeanFactory的基本类体系结构(类为主): 上图可与 Spring源码分析——BeanFactory体系之接口详细分析 的图结合分析,一个以接口为主,一个以类为主(PS:Spring的体系结构要分析清楚,不得不曲线救国啊!不然27寸屏幕给我画估计都装不下.). 具体: 1.7层的类体系继承. 2.Abstrac

Netty源码分析第8章(高性能工具类FastThreadLocal和Recycler)---->第5节: 同线程回收对象

Netty源码分析第八章: 高性能工具类FastThreadLocal和Recycler 第五节: 同线程回收对象 上一小节剖析了从recycler中获取一个对象, 这一小节分析在创建和回收是同线程的前提下, recycler是如何进行回收的 回顾第三小节的demo中的main方法: public static void main(String[] args){ User user1 = RECYCLER.get(); user1.recycle(); User user2 = RECYCLER

Netty源码分析第8章(高性能工具类FastThreadLocal和Recycler)---->第6节: 异线程回收对象

Netty源码分析第八章: 高性能工具类FastThreadLocal和Recycler 第六节: 异线程回收对象 异线程回收对象, 就是创建对象和回收对象不在同一条线程的情况下, 对象回收的逻辑 我们之前小节简单介绍过, 异线程回收对象, 是不会放在当前线程的stack中的, 而是放在一个WeakOrderQueue的数据结构中, 回顾我们之前的一个图: 8-6-1 相关的逻辑, 我们跟到源码中: 首先从回收对象的入口方法开始, DefualtHandle的recycle方法: public