根据已经commit的数据,进行leader和peon之间的同步

Leader Election基本设计

  • 按照rank表示优先级解决冲突问题,为每个monitor预先分配了一个rank
  • 只会接受优先级(rank)比自己高、epoch比上次已接受的epoch大的选举请求
  • 当选的leader,不一定有最新的数据。所以在phase 1中,会根据已经commit的数据,进行leader和peon之间的同步
  • 用奇数的epoch表示选举状态,偶数表示稳定状态
  • 一旦选举成功,会形成一个quorum,在该leader当选期间,所有提议,必须quorum中全部成员同意

Leader Election主要过程和函数

  • Elector::init()
    初始化处理,从kv读取之前持久化的信息
  • Elector::shutdown()
    退出处理
  • Elector::start()
    推选自己(自荐)
  • Elector::defer()
    接受别人选举,延迟推选自己
  • Elector::handle_propose()
    处理别人的自荐消息
  • Elector::handle_ack()
    处理别人的ack
  • Elector::victory()
    宣布自己当选
  • Elector::handle_victory()
    处理别人当选消息
  • Elector::expire()
    选举超时处理
  • Elector::reset_timer()
    设置选举timer
  • Elector::cancel_timer()
    取消timer
  • Elector::dispatch()
    选举消息的分发函数

代码

初始化

/*Ceph的monitor使用leveldb作为持久化存储,下面的mon->store
就是leveldb操作的封装,由于很多数据共用同一leveldb,所以对
key的空间做了分级,Monitor::MONITOR_NAME可以认为是第一级key*/
void Elector::init()
{
  //选举的epoch,递增分配。每次修改都做了持久化,这是从kv db读取。
  epoch = mon->store->get(Monitor::MONITOR_NAME, "election_epoch");
  if (!epoch)//首次使用
    epoch = 1;
}

bump_epoch

/*将自己的epoch修改为参数 e,需要持久化到 kv 存储。*/
void Elector::bump_epoch(epoch_t e)
{
  dout(10) << "bump_epoch " << epoch << " to " << e << dendl;
  assert(epoch <= e);
  epoch = e;
  //使用一个事务,写kv存储
  MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
  t->put(Monitor::MONITOR_NAME, "election_epoch", epoch);
  mon->store->apply_transaction(t);//持久化收到的epoch。

  //join election会使 monitor 进入 STATE_ELECTING 状态
  mon->join_election();

  // clear up some state
  electing_me = false;  //因为别人的epoch比自己大,放弃选自己
  acked_me.clear(); //这个acked_m,是选自己才有意义,在此清空。
  classic_mons.clear();
}

推选自己为leader的函数(自荐)

//在启动后或者leader超时等场合,会发起自荐
void Elector::start()//推选自己
{
  if (!participating) {
    return;
  }

  //清空,表示还没人响应自己
  acked_me.clear();
  classic_mons.clear();
  //从store获取持久化的epoch.
  init();

  if (epoch % 2 == 0) //epoch是偶数表明是稳定态
    bump_epoch(epoch+1);  //odd == election cycle 选举都是用的奇数epoch
  start_stamp = ceph_clock_now(g_ceph_context);
  electing_me = true; //设置为true,前面的bump_epoch可能设置为false了
  //填写map的key是自己的rank,表明自己先同意了自己
  acked_me[mon->rank] = CEPH_FEATURES_ALL; 

  leader_acked = -1;//无效值,表明我没有ack别人

  //给monmap中每个成员发送消息。可以认为monmap成员是预先配置的,且配置了rank
  for (unsigned i=0; i<mon->monmap->size(); ++i) {
    if ((int)i == mon->rank) continue;
    //消息中带有自己的epoch
    Message *m = new MMonElection(MMonElection::OP_PROPOSE, epoch, mon->monmap);
    mon->messenger->send_message(m, mon->monmap->get_inst(i));
  }

  reset_timer();//设置选举用的timer,参见expire()函数
}

响应别人自荐

//defer就是暂时放弃推选自己
void Elector::defer(int who)
{
  if (electing_me) {//放弃选自己
    acked_me.clear();
    classic_mons.clear();
    electing_me = false;
  }

  //表明我支持了"who"当leader. leader_acked不需要持久化,因为任何一个monitor在
  //reboot后都会重新发起election。
  leader_acked = who;
  ack_stamp = ceph_clock_now(g_ceph_context);
  //返回OP_ACK消息,即赞成对方当leader
  MMonElection *m = new MMonElection(MMonElection::OP_ACK, epoch, mon->monmap);
  m->sharing_bl = mon->get_supported_commands_bl();
  mon->messenger->send_message(m, mon->monmap->get_inst(who));
  // set a timer  对方在一定时间内,应该宣布自己当选才对
  reset_timer(1.0);  // give the leader some extra time to declare victory
}

timer相关工具函数

设置timer的工具函数

void Elector::reset_timer(double plus)
{
  // set the timer
  cancel_timer();
  expire_event = new C_ElectionExpire(this);
  mon->timer.add_event_after(g_conf->mon_lease + plus,
                 expire_event);
}

取消timer的工具函数

void Elector::cancel_timer()
{
  if (expire_event) {
    mon->timer.cancel_event(expire_event);
    expire_event = 0;
  }
}

超时处理

void Elector::expire()
{
  // 如果是自荐,只要超过半数同意,就认为成功
  if (electing_me &&
      acked_me.size() > (unsigned)(mon->monmap->size() / 2)) {
    //注意,expire判断的是 > monmap->size()/2,而handle_ack里面是等待全部ack。
    // i win
    victory();
  } else {//没有推选自己
    // whoever i deferred to didn‘t declare victory quickly enough.
    if (mon->has_ever_joined)
      start();//之前我加入过quorum,直接重新发动选举。因为monmap中会包含我。
    else
      mon->bootstrap();//否则,走bootstrap
  }
}

成功当选leader处理

void Elector::victory()
{
  leader_acked = -1;
  electing_me = false;

  uint64_t features = CEPH_FEATURES_ALL;
  set<int> quorum;
  for (map<int, uint64_t>::iterator p = acked_me.begin(); p != acked_me.end();
       ++p) {//如果是从expire()调用的victory(),则不是monmap的记录的所有node,
       //但是肯定是超过半数。
    quorum.insert(p->first);//ack过我的,全部进入quorum。
    features &= p->second;
  }

  // decide what command set we‘re supporting
  bool use_classic_commands = !classic_mons.empty();
  // keep a copy to share with the monitor; we clear classic_mons in bump_epoch
  set<int> copy_classic_mons = classic_mons;

  cancel_timer();

  assert(epoch % 2 == 1);  // 选举期间用的奇数epoch
  bump_epoch(epoch+1);     // 选举完成,epoch变成偶数

  // decide my supported commands for peons to advertise
  const bufferlist *cmds_bl = NULL;
  const MonCommand *cmds;
  int cmdsize;
  if (use_classic_commands) {
    mon->get_classic_monitor_commands(&cmds, &cmdsize);
    cmds_bl = &mon->get_classic_commands_bl();
  } else {
    mon->get_locally_supported_monitor_commands(&cmds, &cmdsize);
    cmds_bl = &mon->get_supported_commands_bl();
  }

  //通知大家自己当选
  for (set<int>::iterator p = quorum.begin();
       p != quorum.end();
       ++p) {
    if (*p == mon->rank) continue;
    MMonElection *m = new MMonElection(MMonElection::OP_VICTORY, epoch, mon->monmap);
    m->quorum = quorum;
    m->quorum_features = features;
    m->sharing_bl = *cmds_bl;
    mon->messenger->send_message(m, mon->monmap->get_inst(*p));
  }

  //调用monitor的函数,它会发起paxos的propose
  mon->win_election(epoch, quorum, features, cmds, cmdsize, &copy_classic_mons);
}

处理别人的自荐消息

根据情况决定是否支持,或者决定该推荐自己

void Elector::handle_propose(MonOpRequestRef op)
{
  MMonElection *m = static_cast<MMonElection*>(op->get_req());
  dout(5) << "handle_propose from " << m->get_source() << dendl;
  int from = m->get_source().num();//获取对方的rank

  assert(m->epoch % 2 == 1); // election
  uint64_t required_features = mon->get_required_features();

  if ((required_features ^ m->get_connection()->get_features()) &
      required_features) {//要求对方的feature,覆盖required_features
    nak_old_peer(op);
    return;
  } else if (m->epoch > epoch) {//对方epoch比我大,放弃选自己,追随它。
    bump_epoch(m->epoch);
  } else if (m->epoch < epoch) {//对方epoch太小,否决
    // got an "old" propose,
    //发送消息的peer可能是刚刚加进来的,以前不在quorum里面。所以epoch比较小
    if (epoch % 2 == 0 &&    // in a non-election cycle
    mon->quorum.count(from) == 0) {  // from someone outside the quorum
      // a mon just started up, call a new election so they can rejoin!
      //为什么我要start_election? 因为其epoch太旧,不可能当选。
      mon->start_election();
    } else {//认为收到了旧消息,忽略
      dout(5) << " ignoring old propose" << dendl;
      return;
    }
  }
  //我比发送方的rank高。如果我没有响应过其他比我rank高的,就推选自己
  if (mon->rank < from) {
    // i would win over them.
    if (leader_acked >= 0) {        // we already acked someone
      assert(leader_acked < from);  // and they still win, of course 否则不可能ack它
    } else {//如果没有acked过,比我优先级低的在推选自己,那么我应该选自己才对。
      // wait, i should win!
      if (!electing_me) {
    mon->start_election();
      }
    }
  } else {//发送方rank比我高
    // they would win over me
    //之前我没有赞成谁,或者之前那个优先级没现在这个高,则赞成现在这个
    if (leader_acked < 0 ||      // haven‘t acked anyone yet, or
    leader_acked > from ||   // they would win over who you did ack, or
    leader_acked == from) {  // this is the guy we‘re already deferring to
      defer(from);//这个函数内部会发送 OP_ACK,支持发送方
    } else {//我之前响应过别人,坚持之前的选择
      // ignore them!
      dout(5) << "no, we already acked " << leader_acked << dendl;
    }
  }
}

收到 ack之后的处理

//收到别人响应后的处理
void Elector::handle_ack(MonOpRequestRef op)
{
  op->mark_event("elector:handle_ack");
  MMonElection *m = static_cast<MMonElection*>(op->get_req());
  dout(5) << "handle_ack from " << m->get_source() << dendl;
  int from = m->get_source().num();

  assert(m->epoch % 2 == 1); // election状态,必须是奇数
  //下面dout解释了出现这个现象的原因,即我在自己重启,out了。
  if (m->epoch > epoch) {
    dout(5) << "woah, that‘s a newer epoch, i must have rebooted.
      bumping and re-starting!" << dendl;
    bump_epoch(m->epoch);//必须用新的epoch才能引起有效选举,否则被忽略了
    start();
    return;
  }
  assert(m->epoch == epoch);
  uint64_t required_features = mon->get_required_features();
  if ((required_features ^ m->get_connection()->get_features()) &
      required_features) {
    dout(5) << " ignoring ack from mon" << from
        << " without required features" << dendl;
    return;
  }
  //如果正在推选自己
  if (electing_me) {
    // thanks
    //acked_me是个map
    acked_me[from] = m->get_connection()->get_features();
    if (!m->sharing_bl.length())
      classic_mons.insert(from);
    dout(5) << " so far i have " << acked_me << dendl;

    //所有人都赞成我
    if (acked_me.size() == mon->monmap->size()) {
       // if yes, shortcut to election finish
      victory();
    }
  } else {//以前我曾经推选过自己,但是现在我已经投别人了
    // ignore, i‘m deferring already.
    assert(leader_acked >= 0);
  }
}

在别人当选后的处理

//收到别人宣告选举胜利的消息后的处理
void Elector::handle_victory(MonOpRequestRef op)
{
  op->mark_event("elector:handle_victory");
  MMonElection *m = static_cast<MMonElection*>(op->get_req());
  int from = m->get_source().num();

  assert(from < mon->rank);
  assert(m->epoch % 2 == 0);

  leader_acked = -1;
  //之前我一定选举了它,所以epoch必须match,否则有问题。
  // i should have seen this election if i‘m getting the victory.
  if (m->epoch != epoch + 1) {
  //在victory()中,已经加1,所以是偶数,且比peon看到的大1
    dout(5) << "woah, that‘s a funny epoch, i must have rebooted.
    bumping and re-starting!" << dendl;
    bump_epoch(m->epoch);
    start();
    return;
  }

  bump_epoch(m->epoch);//我也变成偶数epoch

  // they win
  mon->lose_election(epoch, m->quorum, from, m->quorum_features);

  // cancel my timer
  cancel_timer();//选举timer没用了

  // stash leader‘s commands
  if (m->sharing_bl.length()) {
    MonCommand *new_cmds;
    int cmdsize;
    bufferlist::iterator bi = m->sharing_bl.begin();
    MonCommand::decode_array(&new_cmds, &cmdsize, bi);
    mon->set_leader_supported_commands(new_cmds, cmdsize);
  } else { // they are a legacy monitor; use known legacy command set
    const MonCommand *new_cmds;
    int cmdsize;
    mon->get_classic_monitor_commands(&new_cmds, &cmdsize);
    mon->set_leader_supported_commands(new_cmds, cmdsize);
  }
}

消息分发函数

消息的dispatch,里面有些关于monmap的处理。

/*monmap,是集群当前配置的所有monitor的集合。
 *monmap在bootstrp过程中会在montior间同步,这里没仔细讨论。
 *monmap中的各个monitor,只有参与选举投票的,才会进入quorum。*/
void Elector::dispatch(MonOpRequestRef op)
{
  op->mark_event("elector:dispatch");
  assert(op->is_type_election());

  switch (op->get_req()->get_type()) {

  case MSG_MON_ELECTION://elector只收election这个类别的消息
    {
      if (!participating) {
        return;
      }
      if (op->get_req()->get_source().num() >= mon->monmap->size()) {
    dout(5) << " ignoring bogus election message with bad mon rank "
        << op->get_req()->get_source() << dendl;
    return;
      }

      MMonElection *em = static_cast<MMonElection*>(op->get_req());

      // assume an old message encoding would have matched
      if (em->fsid != mon->monmap->fsid) {
    dout(0) << " ignoring election msg fsid "
        << em->fsid << " != " << mon->monmap->fsid << dendl;
    return;
      }
    //选举是根据monmap干活的。monmap在bootstrap阶段大家已经同步了。
      if (!mon->monmap->contains(em->get_source_addr())) {
    dout(1) << "discarding election message: " << em->get_source_addr()
        << " not in my monmap " << *mon->monmap << dendl;
    return;
      }

      MonMap *peermap = new MonMap;
      peermap->decode(em->monmap_bl);
      //比较二者的monmap的epoch,即二者看到的monitor配置应该相同。
      if (peermap->epoch > mon->monmap->epoch) {
    dout(0) << em->get_source_inst() << " has newer monmap epoch " << peermap->epoch
        << " > my epoch " << mon->monmap->epoch
        << ", taking it"
        << dendl;
    mon->monmap->decode(em->monmap_bl);
        MonitorDBStore::TransactionRef t(new MonitorDBStore::Transaction);
        //更新了自己的monmap,并且写盘。实际上信任了对方的monmap。
        t->put("monmap", mon->monmap->epoch, em->monmap_bl);
        t->put("monmap", "last_committed", mon->monmap->epoch);
        mon->store->apply_transaction(t);
    //mon->monmon()->paxos->stash_latest(mon->monmap->epoch, em->monmap_bl);
    cancel_timer();
    mon->bootstrap();//重新做一次自举。自举后会重新选举
    delete peermap;
    return;
      }
      if (peermap->epoch < mon->monmap->epoch) {
      //这种情况下,会用我的map去同步对方的。
    dout(0) << em->get_source_inst() << " has older monmap epoch " << peermap->epoch
        << " < my epoch " << mon->monmap->epoch
        << dendl;
      }
      delete peermap;
      switch (em->op) {
      case MMonElection::OP_PROPOSE:
    handle_propose(op);
    return;
      }

      if (em->epoch < epoch) www.rcsx.org {//为什么又比较这个epoch?
    dout(5) << "old epoch, dropping" << dendl;
    break;
      }

      switch (em->op) {
      case MMonElection::OP_ACK:
    handle_ack(op);
    return;
      case MMonElection::OP_VICTORY:
    handle_victory(op);
    return;
      case MMonElection::OP_NAK:
    handle_nak(op);
    return;
      default:
    assert(0);
      }
    }
    break;

  default:
    assert(0);
  }
}
//admin command处理,触发选举
void Elector::start_participating()
{
  if (!participating) {
    participating = true;
    call_election();
  }
}
时间: 2024-11-05 02:21:06

根据已经commit的数据,进行leader和peon之间的同步的相关文章

vuex commit保存数据技巧

使用vuex时 官方推荐使用commit才修改state数据. 优点 便于调试,当数据变化时,可以在vuetools工具中看到是哪个函数修改了state值. 缺点 采用commit修改数据,可能会写很多mutations函数. 会上丧失掉一部分性能.因为新数据需要重新配置watcher. 优化 传一个字符串的path和需要修改的值,如果path='a.b.c'就换算成 state.a.b.c = 'needsave', 这就达到了一个commit 解决所有保存的问题. save(state, {

老李分享:大数据,数据库,数据仓库之间是什么关系

老李分享:大数据,数据库,数据仓库之间是什么关系 poptest是国内唯一一家培养测试开发工程师的培训机构,以学员能胜任自动化测试,性能测试,测试工具开发等工作为目标.如果对课程感兴趣,请大家咨询qq:908821478,咨询电话010-84505200. 首先简单的看一下云计算与大数据的概念. 1)云计算:云计算本质上是一种计算资源集中分布和充分共享的效用计算模式,其中集中是为了计算资源的集约化管理,分布是便于扩展计算能力.集中分布式是针对云服务提供商的,充分共享是针对用户,在云计算中,虽然对

NSJSONSerialization-JSON数据与NSDictionary和NSArray之间的转化

    在iOS  5 中,苹果引入了一个解析JSON串的NSJSONSerialization类. 通过该类,我们可以完成JSON数据与NSDictionary和NSArray之间的转化. 以前,我记得我用的是第三方的插件.但是,苹果出了这套解析后,效率也大大了超过了,所有的解析第三方类库.所以,推荐使用NSJSONSerialization类来完成转化. 一.将NSDictionary或NSArray转化为JSON串 // 将字典或者数组转化为JSON串 - (NSData *)toJSON

阿里巴巴天池大数据竞赛黄金联赛全面开战,全球同步报名,只为寻找最聪明的你!

阿里巴巴天池大数据竞赛黄金联赛全面开战,全球同步报名,只为寻找最聪明的你!        天池大数据竞赛是由阿里巴巴集团主办,面向全球新生代力量的高端算法竞赛.通过开放海量数据和"天池"分布式计算平台,大赛让所有参与者有机会运用其设计的算法解决各类社会生活问题和商业世界中的实际问题.特别优秀的解决方案将有机会直接上线阿里巴巴旗下各电商网站(含淘宝.天猫等)或第三方合作伙伴平台,服务中国乃至世界数以亿计的用户.        2015年天池大数据竞赛将全面升级为黄金联赛,包含三个不同场景

速战速决 (6) - PHP: 获取 http 请求数据, 获取 get 数据 和 post 数据, json 字符串与对象之间的相互转换

[源码下载] 作者:webabcd 介绍速战速决 之 PHP 获取 http 请求数据 获取 get 数据 和 post 数据 json 字符串与对象之间的相互转换 示例1.获取 http 请求数据http/http1.php <?php /** * 获取 http 请求数据 */ // 通过 $_SERVER 获取相关数据 echo "PHP_SELF : " . $_SERVER['PHP_SELF'] . "<br />"; echo &qu

100万个数据,数据值在0~65535之间,请用尽可能少的内存和最快的速度从小到大排序

场景说明:100万个数据,数据值在0~65535之间,请用尽可能少的内存和最快的速度从小到大排序 voidsort(int* array, int n) { //n的值在100万左右 //你的实现 } 我们首先观察到所有的数据已经保存到了array数组中,现在我们需要做的就是将数组中的元素排序.现在我们把数组中的元素提取出来比如是3,然后我们提取出数组下标是3的元素,保存到临时空间,通过负数来计算个数: void sort(int* array, int n) {     int tmp=0;

Vue-Router路由Vue-CLI脚手架和模块化开发 之 在单文件组件项目中定义数据、方法和组件之间的相互引用

定义数据 根据上一篇博文配置项目的结构的基础上继续进行优化: 在app.vue中的导出模块/组件部分设置其属性: export default{//导出模块/组件 data(){ return{ name:'perfect', count:0 } }, 在一个template标签中进行调用: <template> <div> <h2> 欢迎来到perfect*的博客园!!!</h2> <h3>{{name}}</h3> </te

vue+vuex+axios从后台获取数据存入vuex,组件之间共享数据

在vue项目中组件间相互传值或者后台获取的数据需要供多个组件使用的情况很多的话,有必要考虑引入vuex来管理这些凌乱的状态,今天这边博文用来记录这一整个的过程,后台api接口是使用webpack-server模拟的接口,这个前面的文章中有提到,需要的可以去翻阅. 整个的流程是在组件的created中提交dispatch,然后通过action调用一个封装好的axios然后再触发mutation来提交状态改变state中的数据,然后在组件的计算属性中获取state的数据并渲染在页面上 首先新需要在项

AutoMapper(一)——实现数据契约和实体类之间的转换

以前做过Object到Object的转换方法的封装,底层主要还是靠反射取值赋值+循环来实现的.最近在公司里维护旧系统,这个系统使用了WCF,当时就很好奇他们的数据契约到实体模型的转换时怎么做的,之后查看源码,发现他们居然是一个属性一个属性的赋值过来的,当时就给我雷到了,难道就没有人举得这个地方可以封装起来吗? 据我目前了解,有三种方法可以实现这个实体类到实体类的转换过程,下面来简述下这三种方式. 第一种,也是最简单的一种,是用list自带的转换方法,但是这种方法需要自己定义转换规则,写起来也挺麻