akka cluster sharding source code 学习 (1/5) handle off

一旦 shard coordinator(相当于分布式系统的 zookeeper) 启动,它就会启动一个定时器,每隔一定的时间尝试平衡一下集群中各个节点的负载,平衡的办法是把那些负载较重的 actor 移动到负载较轻的节点上。在这一点上,我以前的理解有误,我以为 shardRegion 是移动的最小单位。

val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)

当 coordinator 收到 ReblanceTick 后,就开始尝试平衡系统负载

case RebalanceTick ⇒
      if (persistentState.regions.nonEmpty) {
        val shardsFuture = allocationStrategy.rebalance(persistentState.regions, rebalanceInProgress)
        shardsFuture.value match {
          case Some(Success(shards)) ⇒
            continueRebalance(shards)
          case _ ⇒
            // continue when future is completed
            shardsFuture.map { shards ⇒ RebalanceResult(shards)
            }.recover {
              case _ ⇒ RebalanceResult(Set.empty)
            }.pipeTo(self)
        }
      }

上面的逻辑我看懂了,但是 Future 的用法没看明白。按照一般的写法,当 shardsFuture 返回 Failure 以后,应该直接执行 RebalanceResut(Set.empty).pipeTo(self),不知道为什么失败以后还要尝试等待 Future

allocationStrategy 提供了默认的实现,也可以自定义负载均衡策略。rebalance 函数返回的是 Set(ShardId),即那些要被移动的 shards

当 coordinator 收到 RebalanceResult 后,开始 启动 balance 逻辑

def continueRebalance(shards: Set[ShardId]): Unit =
    shards.foreach { shard ⇒
      if (!rebalanceInProgress(shard)) {
        persistentState.shards.get(shard) match {
          case Some(rebalanceFromRegion) ⇒
            rebalanceInProgress += shard
            log.debug("Rebalance shard [{}] from [{}]", shard, rebalanceFromRegion)
            context.actorOf(rebalanceWorkerProps(shard, rebalanceFromRegion, handOffTimeout,
              persistentState.regions.keySet ++ persistentState.regionProxies)
              .withDispatcher(context.props.dispatcher))
          case None ⇒
            log.debug("Rebalance of non-existing shard [{}] is ignored", shard)
        }

      }
    }

rebalanceInProcess 是一个 Set,记录正在被移动的 shard,我想,在新一轮 balance 开始时, rebalanceInProcess 为空的情况只会发生在上次 balance 还没有做完。不知道这个时候,是应该报错还是继续 balance 更好,因为 balanceStrategy 应该不会考虑吧到 上一轮 balance 还没做完这种可能性。

然后, coordinator 启动 rebalanceWorker,也就是上篇提到的替身 actor。

private[akka] class RebalanceWorker(shard: String, from: ActorRef, handOffTimeout: FiniteDuration,
                                      regions: Set[ActorRef]) extends Actor {
    import Internal._
    regions.foreach(_ ! BeginHandOff(shard))
    var remaining = regions

    import context.dispatcher
    context.system.scheduler.scheduleOnce(handOffTimeout, self, ReceiveTimeout)

    def receive = {
      case BeginHandOffAck(`shard`) ⇒
        remaining -= sender()
        if (remaining.isEmpty) {
          from ! HandOff(shard)
          context.become(stoppingShard, discardOld = true)
        }
      case ReceiveTimeout ⇒ done(ok = false)
    }

    def stoppingShard: Receive = {
      case ShardStopped(shard) ⇒ done(ok = true)
      case ReceiveTimeout      ⇒ done(ok = false)
    }

    def done(ok: Boolean): Unit = {
      context.parent ! RebalanceDone(shard, ok)
      context.stop(self)
    }
  }

akka 的逻辑是基于消息传递的,这种代码其实是很难去读的。在 rebalanceWorker 运行时,牵扯到很多个 actor。首先是,coordinator,其次是 shardRegion,也就是 host 待迁移 shard actor 的那个 region,然后是 shard actor 本身,最后是系统里所有的 shardRegion,他们也要参与进来。写到这里,我不禁把电脑屏幕竖了起来。

1. RebalanceWorker 首先给所有的 ShardRegion BeginHandOff 消息,告诉大家,hand off 开始,然后等待大家的回复

2. ShardRegion 收到 BeginHandOff 后,开始更新自己的知识库,将 HostShardRegion 和 shardActor 的记忆从自己的知识库中抹去

case BeginHandOff(shard) ⇒
      log.debug("BeginHandOff shard [{}]", shard)
      if (regionByShard.contains(shard)) {
        val regionRef = regionByShard(shard)
        val updatedShards = regions(regionRef) - shard
        if (updatedShards.isEmpty) regions -= regionRef
        else regions = regions.updated(regionRef, updatedShards)
        regionByShard -= shard
      }
      sender() ! BeginHandOffAck(shard)

最后,发送 BeginHandOffAck 消息,告诉 rebalanceWorker 自己准备完毕(这些 shardRegion 以后也没事干了)

3. 继续回到 rebalanceWorker,它发送 HandOff 告诉 Host shard actor 的 ShardRegion,你可以做自己的清理工作了。然后将自己的状态设置成 stoppingShard,等待 ShardStopped 消息,这个消息的来源有两个,一个是 HostShardRegion,另外一个是 shard actor

4. HostShardRegion 收到 HandOff 消息后

case msg @ HandOff(shard) ⇒
      log.debug("HandOff shard [{}]", shard)

      // must drop requests that came in between the BeginHandOff and now,
      // because they might be forwarded from other regions and there
      // is a risk or message re-ordering otherwise
      if (shardBuffers.contains(shard)) {
        shardBuffers -= shard
        loggedFullBufferWarning = false
      }

      if (shards.contains(shard)) {
        handingOff += shards(shard)
        shards(shard) forward msg
      } else
        sender() ! ShardStopped(shard)

如果 HostShardRegion 已经不再含有 shard actor,那么直接返回 ShardStopped,否则 HandOff 这个 Set 加入 shard actor,并将 HandOff 传给 shard actor

5. 又看了一遍代码,发现 shard actor 和 entity actor 又是两种东西,shard actor 存在于 entity actor 和 shard region 之间

目前还不知道 entity actor 和 shard region 之间的关系

def getEntity(id: EntityId): ActorRef = {
    val name = URLEncoder.encode(id, "utf-8")
    context.child(name).getOrElse {
      log.debug("Starting entity [{}] in shard [{}]", id, shardId)

      val a = context.watch(context.actorOf(entityProps, name))
      idByRef = idByRef.updated(a, id)
      refById = refById.updated(id, a)
      state = state.copy(state.entities + id)
      a
    }
  }

从这段代码来看, shard actor 与 entity actor 是一对多的关系。

def receiveCoordinatorMessage(msg: CoordinatorMessage): Unit = msg match {
    case HandOff(`shardId`) ⇒ handOff(sender())
    case HandOff(shard)     ⇒ log.warning("Shard [{}] can not hand off for another Shard [{}]", shardId, shard)
    case _                  ⇒ unhandled(msg)
  }

  def handOff(replyTo: ActorRef): Unit = handOffStopper match {
    case Some(_) ⇒ log.warning("HandOff shard [{}] received during existing handOff", shardId)
    case None ⇒
      log.debug("HandOff shard [{}]", shardId)

      if (state.entities.nonEmpty) {
        handOffStopper = Some(context.watch(context.actorOf(
          handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage))))

        //During hand off we only care about watching for termination of the hand off stopper
        context become {
          case Terminated(ref) ⇒ receiveTerminated(ref)
        }
      } else {
        replyTo ! ShardStopped(shardId)
        context stop self
      }
  }
def receiveTerminated(ref: ActorRef): Unit = {  if (handOffStopper.exists(_ == ref))    context stop self  else if (idByRef.contains(ref) && handOffStopper.isEmpty)    entityTerminated(ref)}

从这段代码看, shard actor 与 entity actor 的关系是一对一,因为当 entity stop self 了以后, shard actor 也会 stop self。这让我想到 coursera reactive programming 的最后一道作业题,为什么也是类似于 一个 entity 有一个 shard actor 对应。

时间: 2024-11-09 07:40:55

akka cluster sharding source code 学习 (1/5) handle off的相关文章

akka cluster sharding source code 学习 (1/5) 替身模式

为了使一个项目支持集群,自己学习使用了 akka cluster 并在项目中实施了,从此,生活就变得有些痛苦.再配上 apache 做反向代理和负载均衡,debug 起来不要太酸爽.直到现在,我还对 akka cluster 输出的 log 不是很熟悉,目前网络上 akka cluster 的信息还比较少,想深入了解这东西的话,还是要自己读 source code.前几天,雪球那帮人说 akka 不推荐使用,有很多坑,这给我提了个醒,目前我对 akka 的理解是远远不够的,需要深入学习. akk

akka cluster sharding

cluster sharding 的目的在于提供一个框架,方便实现 DDD,虽然我至今也没搞明白 DDD 到底适用于是什么场合,但是 cluster sharding 却是我目前在做的一个 project 扩展到集群上非常需要的工具. sharding 要做这么几件事 1. 对于每一个 entity,创建一个 actor.该 entity 有 Id 作为唯一标示.该 entity 的所有消息都由此 actor 来处理 2. 该 actor 在一段时间内不工作时,会超时并 kill self 3.

android activity 启动过程分析(source code 4.4)

说实话,android source code从2.3到4.4变化是蛮多的,尤其是media部分,虽然总的框架是没有多大变化,但是找起代码来看还是挺麻烦的.在android里面最受伤的是使用了java,jni,jvm,Nativity c++等等,各种设计模式横行,当然在学习源码过程中也意识了编程语言基础,数据结构,设计模式的重要性. android source code 经典的地方: 1. 大量使用了各种设计模式如单例模式,装饰模式,工程工厂模式,适配器模式等等. 2. 使用了binder驱

something beyond the source code

不知不觉中大学已经过半惹_(:з」∠)_是时候再来总结一下了. 关于过去过去的两年,走了些弯路,也有些小收获.也记录一下吧1.不要总是为自己的懒惰找借口.有时候只是自己不够努力罢了2.总有些被称作规则的东西是永远无法改变的.先去适应它才能去改变它.大人都在努力刷分,小孩子才犯中二病.3.有些目标是主要的有些目标是次要的.为了达到主要目标要果断抛弃次要目标.4.发现了一种很高效的学习方法w,也摸索出一些CS的人生经验.5.很讨厌心灵鸡汤,因为它把做成一件事简单归结为努力再努力,却忽略了正确的方法和

Learning English From Android Source Code:1

英语在软件行业的重要作用不言自明,尤其是做国际项目和写国际软件,好的英语表达是项目顺利进行的必要条件.纵观眼下的IT行业.可以流利的与国外客户英文口语交流的程序猿占比并非非常高.要想去国际接轨,语言这一关一定要过. 本人做刚入行的时候非常想找一本专门写给程序猿的英文教材,但并没有找到特别合适的.通过这几年的欧美项目经理,我发现与国外同行交流重在表明交流的意图而轻语法规定.一件事情的表述,仅仅要可以用几个Key Words来表述清楚.两方可以理解就可以.并没有使用我们上学期间艰深晦涩的语法知识.

Classic Source Code Collected

收藏一些经典的源码,持续更新!!! 1.深度学习框架(Deep Learning Framework). A:Caffe (Convolutional Architecture for Fast Feature Embedding)Convolutional 由伯克利大学Yangqing Jia Ph.D开发的开源深度学习的代码. Homepage:http://caffe.berkeleyvision.org/ Paper:Caffe: Convolutional Architecture f

Steps of source code change to executable application

程序运行的整个过程,学习一下 源代码 (source code) → 预处理器 (preprocessor) → 编译器 (compiler) → 汇编程序 (assembler) → 目标代码 (object code) → 连接器 (Linker) → 可执行程序 (executables) 1. 词法分析 词法分析器根据词法规则识别出源程序中的各个记号(token),每个记号代表一类单词(lexeme).源程序中常见的记号可以归为几大类:关键字.标识符.字面量和特殊符号.词法分析器的输入是

CEPH CRUSH 算法源码分析 原文CEPH CRUSH algorithm source code analysis

原文地址 CEPH CRUSH algorithm source code analysis http://www.shalandis.com/original/2016/05/19/CEPH-CRUSH-algorithm-source-code-analysis/ 文章比较深入的写了CRUSH算法的原理和过程.通过调试深入的介绍了CRUSH计算的过程.文章中添加了些内容. 写在前面 读本文前,你需要对ceph的基本操作,pool和CRUSH map非常熟悉.并且较深入的读过源码. 分析的方法

[code] Transformer For Summarization Source Code Reading

Basic Information 作者:李丕绩(腾讯AI Lab) 模型:Transformer + copy mechanism for abstractive summarization 数据集:CNN/Daily Mail Parameters WARNING: IN DEBUGGING MODE USE COPY MECHANISM USE COVERAGE MECHANISM USE AVG NLL as LOSS USE LEARNABLE W2V EMBEDDING RNN TY