akka cluster sharding

cluster sharding 的目的在于提供一个框架,方便实现 DDD,虽然我至今也没搞明白 DDD 到底适用于是什么场合,但是 cluster sharding 却是我目前在做的一个 project 扩展到集群上非常需要的工具。

sharding 要做这么几件事

1. 对于每一个 entity,创建一个 actor。该 entity 有 Id 作为唯一标示。该 entity 的所有消息都由此 actor 来处理

2. 该 actor 在一段时间内不工作时,会超时并 kill self

3. 当一个集群中加入新的节点时,新的 actor 会被自动创建到新 node 上,或者老的actor 会负载均衡,迁移到新 node 上。同样的,当节点挂掉时,挂掉的 actor 会迁移到新的

在 sharding 被提出之前,google group 和 stackoverflow 上有很多人希望有这么一个东西,那时候还是 2011 年,现在已经 2015 年了。

sharding 的原理和用法 doc 都有比较详细的说明,下面做个小测试:

首先是 entity actor 的定义:

object ActionWorker {
  def props(): Props = Props(new ActionWorker)

  //must return s: Command
  val idExtractor: ShardRegion.IdExtractor = {
    case s: Command => (s.id, s)
  }

  val shardResolver: ShardRegion.ShardResolver = msg => msg match {
    case s: Command   => (math.abs(s.id.hashCode) % 100).toString
  }

  val shardName: String = "ActionWorker"
}

  

idExtractor 会从 message 中抽取 id,作为路由的依据,而返回值的第二项是 actor 收到的消息

shardResolver 根据 id 确定 shard 所在的位置,哈希函数的设定方式我也没有自习研究,doc 说是 id 的十倍就可以。

class ActionWorker extends Actor {
  val log = Logging(context.system, this)

  println("action worker is created")

  context.setReceiveTimeout(30 seconds)

  override def receive: Receive = {
    case Command(id, payload) =>
      val selfDesc = self.path.parent.name + "-" + self.path.name
      println("here i am, working: " + selfDesc)

      log.info("here i am, working: " + selfDesc)

    case ReceiveTimeout =>
      log.info("nothing to do, better kill myself")

      val selfDesc = self.path.parent.name + "-" + self.path.name
      println("here i am, working: " + selfDesc)

      println("nothing to do, better kill myself")
      context.stop(self)
  }

  

actor 的定义没有特别之处,需要注意的是

1. actor 收到的消息是 idExtract 的第二项,而不是 s: Command 那个东西

2. actor 没有一个正常的途径得到自己的id,一个 workaroud 的办法是通过 self.path.name 来得到自己的id,再据此完成某些初始化操作

worker 处理数据,生成数据的 actor 叫做 Bot

class Bot extends Actor {

  val log = Logging(context.system, this)

  val tickTask = context.system.scheduler.schedule(3.seconds, 10.seconds, self, Command(""))

  def receive = create

  val postRegion = ClusterSharding(context.system).shardRegion(ActionWorker.shardName)

  val create: Receive = {
    case Command(id, payload) =>
      val postId = Random.nextInt(5).toString
      log.info("bot create new command and received: " + postId)

      println("new command postID = " + postId)

      postRegion ! Command(postId, postId)

  }
}

  

Bot 生成的数据要传到 worker actor,注意 postRegion 的获得方式,它首先从 actorSystem 中得到 ClusterSharding 集合(一个 actorSystem 可能会有多个 shard),然后根据 shardName 定位到唯一的 shard。最后把需要发送的消息传给 postRegin,postRegin 会完成转发。

Main 方法的 startUp 函数

def startup(ports: Seq[String]): Unit = {
    ports foreach { port =>
      // Override the configuration of the port
      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
        withFallback(ConfigFactory.load())

      // Create an Akka system
      val system = ActorSystem("ClusterSystem", config)

      startupSharedJournal(system, startStore = (port == "2551"), path =
        ActorPath.fromString("akka.tcp://[email protected]:2551/user/store"))

      ClusterSharding(system).start(
        typeName = ActionWorker.shardName,
        entryProps = Some(ActionWorker.props()),
        idExtractor = ActionWorker.idExtractor,
        shardResolver = ActionWorker.shardResolver)

      if (port != "2551" && port != "2552")
        system.actorOf(Props[Bot], "bot")
    }

  }

  def startupSharedJournal(system: ActorSystem, startStore: Boolean, path: ActorPath): Unit = {
    // Start the shared journal on one node (don‘t crash this SPOF)
    // This will not be needed with a distributed journal
    if (startStore)
      system.actorOf(Props[SharedLeveldbStore], "store")

    // register the shared journal
    import system.dispatcher
    implicit val timeout = Timeout(15.seconds)
    val f = (system.actorSelection(path) ? Identify(None))
    f.onSuccess {
      case ActorIdentity(_, Some(ref)) => SharedLeveldbJournal.setStore(ref, system)
      case _ =>
        system.log.error("Shared journal not started at {}", path)
        system.shutdown()
    }
    f.onFailure {
      case _ =>
        system.log.error("Lookup of shared journal at {} timed out", path)
        system.shutdown()
    }
  }

  

startupSharedJournal 是必须要执行的,不然 cluster 跑不起来。

ClusterSharding 在本 actorSystem 启动,参数比较直观。

需要注意的是:

1. sharedJournal 是在测试情况下才用得到的,在 prod 环境下应该使用 journal

2. cluster sharding 借助 cluster singleton 实现

时间: 2024-10-10 04:11:37

akka cluster sharding的相关文章

akka cluster sharding source code 学习 (1/5) 替身模式

为了使一个项目支持集群,自己学习使用了 akka cluster 并在项目中实施了,从此,生活就变得有些痛苦.再配上 apache 做反向代理和负载均衡,debug 起来不要太酸爽.直到现在,我还对 akka cluster 输出的 log 不是很熟悉,目前网络上 akka cluster 的信息还比较少,想深入了解这东西的话,还是要自己读 source code.前几天,雪球那帮人说 akka 不推荐使用,有很多坑,这给我提了个醒,目前我对 akka 的理解是远远不够的,需要深入学习. akk

akka cluster sharding source code 学习 (1/5) handle off

一旦 shard coordinator(相当于分布式系统的 zookeeper) 启动,它就会启动一个定时器,每隔一定的时间尝试平衡一下集群中各个节点的负载,平衡的办法是把那些负载较重的 actor 移动到负载较轻的节点上.在这一点上,我以前的理解有误,我以为 shardRegion 是移动的最小单位. val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self

akka cluster make node as unreachable 问题

如果确认cluster 端没有问题,就应该检查一下akka的 application.conf. 在akka cluster中,主要通过故障检测机制(Failure Detector)来看集群中各个节点是否处理可用状态. 集群中每个节点都会监控(最大5个)集群中其余的节点是否处于正常状态,当其中一个节点A检测到另一个节点B处理不可到达(unreachable)状态,它会通过gossip协议通知其余节点,其余节点将B节点标记为unreachable.如果你在application.conf中配置了

akka cluster 初体验

cluster 配置 akka { actor { provider = "akka.cluster.ClusterActorRefProvider" } remote { log-remote-lifecycle-events = off enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 0 } } cluster {

Akka Cluster简介与基本环境搭建

??akka集群是高容错.去中心化.不存在单点故障以及不存在单点瓶颈的集群.它使用gossip协议通信以及具备故障自动检测功能. Gossip收敛 ??集群中每一个节点被其他节点监督(默认的最大数量为5).集群中的节点互相监督着,某节点所监督的状态也正在被其他监督着.通过gossip协议,节点向其他节点传递自己所见节点的最新状态(Up.Joining等等),同时节点也在接收来自其他节点的信息,这些信息包括哪些节点以及这些节点对应的状态,并这些节点加入到自己的seen表里去,表示自己已经看见了这些

Akka cluster gossip protocol

General background: Type of gossip: Push gossip: A node that has new information initiates gossip message to some other random node: Message usually contains full state.Propagation fast when less than half of the nodes are infected Pull gossip:A node

Scalable, Distributed Systems Using Akka, Spring Boot, DDD, and Java--转

原文地址:https://dzone.com/articles/scalable-distributed-systems-using-akka-spring-boot-ddd-and-java When data that needs to be processed grows large and can’t be contained within a single JVM, AKKA clusters provides features to build such highly scalabl

Akka(10): 分布式运算:集群-Cluster

Akka-Cluster可以在一部物理机或一组网络连接的服务器上搭建部署.用Akka开发同一版本的分布式程序可以在任何硬件环境中运行,这样我们就可以确定以Akka分布式程序作为标准的编程方式了. 在上面两篇讨论里我们介绍了Akka-Remoting.Akka-Remoting其实是一种ActorSystem之间Actor对Actor点对点的沟通协议.通过Akka-Remoting来实现一个ActorSystem中的一个Actor与另一个Actorsystem中的另一个Actor之间的沟通.在Re

【MangoDB分片】配置mongodb分片群集(sharding cluster)

配置mongodb分片群集(sharding cluster) Sharding cluster介绍 这是一种可以水平扩展的模式,在数据量很大时特给力,实际大规模应用一般会采用这种架构去构建monodb系统. 要构建一个 MongoDB Sharding Cluster,需要三种角色: Shard Server: mongod 实例,用于存储实际的数据块,实际生产环境中一个shard server角色可由几台机器组个一个relica set承担,防止主机单点故障 Config Server: m