ZK集群中通过Processor保证数据一致性源码阅读

入口

书接上篇博客中的ZK集群启动后完成数据的统一性恢复后,来到启动ZkServer的逻辑,接下来的重点工作就是启动不同角色的对应的不同的处理器Processor

如上图查看ZooKeeperServer的继承图,三种不同的角色有不同的ZooKeeperServer的实现逻辑类

三者启动时,都将会来到ZooKeeperServer.java中的startUp()方法中,源码如下,但是,不同的角色针对setupRequestProcessors();进行了不同的重写,所以本篇博客的重点即使看一下他们是如何重写的

public synchronized void startup() {
    if (sessionTracker == null) {
        // todo 创建session计时器
        createSessionTracker();
    }
    // todo 开启计时器
    startSessionTracker();

    // todo 设置请求处理器, zookeeper中存在不同的请求处理器, 就在下面
    setupRequestProcessors();

    //todo 是一个为应用程序、设备、系统等植入管理功能的框架。
    //todo JMX可以跨越一系列异构操作系统平台、系统体系结构和网络传输协议,灵活的开发无缝集成的系统、网络和服务管理应用
    registerJMX();

    // todo 修改状态  --> running
    setState(State.RUNNING);
    // todo 唤醒所有线程, 因为前面有一个线程等待处理器 睡了一秒
    notifyAll();
}

Leader重写setupRequestProcessors

源码如下: 可以看到它初始化的处理器的个数

  1. PrepRequestProcessor (checkAcl 构造tnx)
  2. ProposalRequestProcessor (发起提议)
  3. CommitProcessor (提交提议)
  4. ToBeAppliedRequestProcessor
  5. FinalRequestProcessor (响应客户端,更新内存)

    SyncRequestProcessor(单独开启的,他是一个线程) 作用: 持久化txn

  RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
                finalProcessor, getLeader().toBeApplied);
        commitProcessor = new CommitProcessor(toBeAppliedProcessor,
                Long.toString(getServerId()), false,
                getZooKeeperServerListener());
        commitProcessor.start();
        ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                commitProcessor);
        proposalProcessor.initialize();
        firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
        ((PrepRequestProcessor)firstProcessor).start();

Follower重写setupRequestProcessors

  1. FollowerRequestProcessor
  2. CommitProcessor
  3. SendAckRequestProcessor
  4. FinalRequestProcessor

    SyncRequestProcessor(单独开启的,他是一个线程) 作用: 持久化txn

   RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        syncProcessor = new SyncRequestProcessor(this,
                new SendAckRequestProcessor((Learner) getFollower()));
        syncProcessor.start();

Observer重写setupRequestProcessors

  1. ObserverRequestProcessor
  2. CommitProcessor
  3. FinalRequestProcessor

通过配置判断是否添加SyncRequestProcessor来持久化它的事务

        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        commitProcessor = new CommitProcessor(finalProcessor,
                Long.toString(getServerId()), true,
                getZooKeeperServerListener());
        commitProcessor.start();
        firstProcessor = new ObserverRequestProcessor(this, commitProcessor);
        ((ObserverRequestProcessor) firstProcessor).start();

        // todo 通过这个判断控制需不需要Observer 对事务进行持久化
        if (syncRequestProcessorEnabled) {
            syncProcessor = new SyncRequestProcessor(this, null);
            syncProcessor.start();
        }

实验1: Leader接受到写请求

直接给出当Leader接收到请求时,request在集群中各个处理器中的运行流程图

通过上图看,当leader接收到请求后,request肯定会依次流经它的处理器,PrepRequestProcessor-->ProposalRequestProcessor

在ProposalRequestProcessor处理器中,同样是直接将request提交给CommitProcessor,但是同样会被阻塞住

接着在request被Leader通过原子广播,告诉所有的Follower这个request

原子广播之后自己会立即使用SyncRequestProcessor完成持久化

同时Follower接受到request之后,也会使用他们自己的SyncRequestProcess进行持久化,完成持久化后就给Leader的LearnerHandler发送ACK确认消息,在这个LearnerHandler会存在过半检查机制,没当一个Follower发送一个ACK,就触发检查一次,直到达到一半以上,就会触发notify(),然后leader刚刚在commitProcessor中wait(),等待提交的函数就会被唤醒,由leader广播commit,全体learner进行commit,达成数据的一致性

实验2: Follower或Observer接收到写请求

直接给出当Follower或者Observer接收到请求时,request在集群中各个处理器中的运行流程图

通过上面图可以看到,当Follower或者Observer接收到请求后会首先会提交给本地的commitProcessor处理器,但是不会立刻commit事务,而是将request转发给Leader的第一个处理器,再之后就和上面的图同样的处理流程

原文地址:https://www.cnblogs.com/ZhuChangwu/p/11619978.html

时间: 2024-10-30 16:37:09

ZK集群中通过Processor保证数据一致性源码阅读的相关文章

ZK集群如何保证数据一致性源码阅读

什么是数据一致性? 只有当服务端的ZK存在多台时,才会出现数据一致性的问题, 服务端存在多台服务器,他们被划分成了不同的角色,只有一台Leader,多台Follower和多台Observer, 他们中的任意一台都能响应客户端的读请求,任意一台也都能接收写请求, 不同的是,Follower和Observer接收到客户端的写请求后不能直接处理这个请求而是将这个请求转发给Leader,由Leader发起原子广播完成数据一致性 理论上ZK集群中的每一个节点的作用都是相同的,他们应该和单机时一样,各个节点

zookeeper集群搭建及Leader选举算法源码解析

一.zookeeper概述 1.zookeeper 简介 zookeeper 是一个开源的分布式应用程序协调服务器,是 Hadoop 的重要组件. zooKeeper 是一个分布式的,开放源码的分布式应用程序协调服务器,是 Google 的Chubby 一个开源的实现,是 Hadoop 和 Hbase 的重要组件.它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护.域名服务.分布式同步.集群管理等.ZooKeeper的目标就是封装复杂易出错的关键服务,将简单易用的接口和性能高效.

Tomcat中session详解(源码阅读)

Tomcat中的session是通过一个manager来管理的,其中Session接口默认实现类有StandardSession,管理器的默认实现是StandardManager. 我们平时在servlet中使用的session也就是HashMap中的一个session对象,同时session除了在内存存储,同时还提供了持久化方法,tomcat中持久化有两种,一种是保存为文件,另一种则是保存到数据库. 这一节首先来看一下默认的StandardSession和StandardManager. pu

运维之我的docker-swarm集群中删除节点和服务

删除swam节点 如果有的确实想要从swarm集群中删除,你应该先把这个节点容器排空,然后再把节点从集群中去掉. 排空节点(其实就是把这个节点上的容器先从其它节点启动,再停掉排空节点上的容器,保证你定义服务的预先状态不受影响) docker node update --availability drain g36lvv23ypjd8v7ovlst2n3yt 删除指定节点 docker node rm  node9 docker node rm --force node9 删除服务 删除服务以后容

LVS集群中的IP负载均衡技术

章文嵩 ([email protected]) 转自LVS官方参考资料 2002 年 4 月 本文在分析服务器集群实现虚拟网络服务的相关技术上,详细描述了LVS集群中实现的三种IP负载均衡技术(VS/NAT.VS/TUN和VS/DR)的工作原理,以及它们的优缺点. 1.前言在 前面文章中,讲述了可伸缩网络服务的几种结构,它们都需要一个前端的负载调度器(或者多个进行主从备份).我们先分析实现虚拟网络服务的主要技术,指出 IP负载均衡技术是在负载调度器的实现技术中效率最高的.在已有的IP负载均衡技术

Storm官方文档翻译之在生产环境集群中运行Topology

在进群生产环境下运行Topology和在本地模式下运行非常相似.下面是步骤: 1.定义Topology(如果使用Java开发语言,则使用TopologyBuilder来创建) 2.使用StormSubmitter向集群提交Topology.StormSubmitter有三个参数,Topology的名字,Topology的配置,和Topology本身.下面是例子: ? 1 2 3 4 Config conf = new Config(); conf.setNumWorkers(20); conf.

MySql集群FAQ----mysql主从配置与集群区别、集群中需要多少台计算机呢?为什么? 等

抽取一部分显示在这里,如下, What's the difference in using Clustervs using replication? 在复制系统中,一个MySQL主服务器会更新一个或多个从服务器.事务是顺序地提交的,因此一个慢事务就可能导致从服务器比主服务器落后一段时间.这也意 味着,如果主服务器出错失败了,那么从服务器可能会缺少记录最后的那一小部分事务日志.如果使用的是事务安全存储引擎的话,例如InnoDB, 那么事务日志则会完全记录到从服务器上去或者完全不记录,但是复制不能保

基于Oracle数据库锁机制,解决集群中的并发访问问题

1.需求 应用场景是这样的: 使用Oracle数据保存待办任务,使用状态字段区分任务是否已经被执行.多个Worker线程同时执行任务,执行成功或失败后,修改状态字段的值. 假设数据库表结构如下所示. create table Task( id varchar2(32), name varchar2(32), flag varchar2(1), worker varchar2(32) ); flag 可取的值包括:0-待办,1-已办,-1-失败待重试. 需要避免的问题: 多个Worker同时工作时

如何在tomcat集群中实现Session共享

转自:http://www.toutiao.com/i6388049068718817794/ Apache集群实现Tomcat的Session共享配置其实很简单,在Tomcat自带的文档中有详细的说明( /docs/cluster-howto.html ),只不过是英语的,所以联合 下面根据说下怎么配置吧: 1.既然是集群肯定要多准备几个Tomcat来模拟,比如分别为Tomcat01.Tomcat02.Tomcat03. 如果各Tomcat程序放在不同的机器上,那么就不会有端口的冲突.如果是放