akka cluster 初体验

cluster 配置

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551",
      "akka.tcp://[email protected]:2552"]

    auto-down-unreachable-after = 10s
  }

  persistence {
    journal.plugin = "akka.persistence.journal.leveldb-shared"
    journal.leveldb-shared.store {
      # DO NOT USE ‘native = off‘ IN PRODUCTION !!!
      native = off
      dir = "target/shared-journal"
    }
    snapshot-store.local.dir = "target/snapshots"
  }

  log-dead-letters = off
}

  

actor.provider 设定选取 clusterActorRefProvider,在 IDE 中该 String 可以跳转到 ClusterActorRefProvider,从程序的注释来看,actorRef provider 其实并不是说 actor 是怎么提供的,它是为了引入 cluster extension,并自动启动 cluster

i.e. the cluster will automatically be started when the ‘ClusterActorRefProvider‘ is used.

创建三个 actorSystem 组成 cluster

  def main(args: Array[String]): Unit = {
    if (args.isEmpty)
      startup(Seq("2551", "2552", "0"))
    else
      startup(args)
  }

  def startup(ports: Seq[String]): Unit = {
    ports foreach { port =>
      // Override the configuration of the port
      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
        withFallback(ConfigFactory.load())

      // Create an Akka system
      val system = ActorSystem("ClusterSystem", config)

      system.actorOf(ClusterSingletonManager.props(Master.props(Duration(99, "second")), "active",
        PoisonPill, None), "master")

    }
  }

  

创建 actorSystem 时的 config 重写了 akka.remote.netty.tcp.port,因为默认的配置只有 port = 0 这个选项。因为三个 cluster 都在本机启动,所以 hostname 不需要额外声明,重用 application.conf 中的 127.0.0.1

另外,ActorSystem 的名字必须统一,都是 ClusterSystem,这是在 application.conf 中的 seed-nodes 中声明的,它是 cluster 的 id。

我猜,当 seed-nodes 都挂掉了,新的 actorSystem 应该就无法加入 cluster 了,因为光靠 cluster id 已经无法找到组织了

object Master {
  val ResultsTopic = "results"

  def props(workTimeout: FiniteDuration): Props =
    Props(classOf[Master], workTimeout)

  case object Job

  case object ParentGreetings

}

class Master(workTimeout: FiniteDuration) extends Actor with ActorLogging {

  context.system.scheduler.schedule(Duration(10, "second"), Duration(5, "second"), self, Job)

  val timeout: Timeout = 10 second

  override def receive: Receive = {
    case Job =>

      val info = ClusterProtocol.selfInfo(self)

      log.info(info._1 + ": " + info._2 + ": " + info._3)

      context.system.actorSelection("user/master/active").resolveOne(4 second).map(actor => actor ! ParentGreetings)

    case ParentGreetings =>
      log.info("greetings from parent")

  }
}

  

上面是 cluster singleton actor 的实现,它有两件事要做,一是打印出自己的路径,而是找到自己。需要注意的是,resolveOne 不能 await(阻塞式的等),否者会报异常,actor not found

上面的代码实现了一个简单的 cluster singleton,具体的表现是,当一个 cluster 有多个 actorSystem 时,当一个 actorSystem 挂掉时,master actor 会继续提供服务,且此 actor 的 instance 有且只有一个。

sbt "run-main packageName.mainName portNum" 可以启动 actorSystem 在端口 portNum 上,不加 端口号,会一下启动三个 actorSystem,但是不方便模拟一个 actorSystem 挂掉的情况。

时间: 2024-10-11 16:33:49

akka cluster 初体验的相关文章

python3之redis cluster初体验

一.Redis 介绍 Redis 是一个开源内存的数据存储系统,行业中用作高效数据库缓存较多.它支持多种类型的数据结构:strings:hashes,lists,sets,sorted sets, bitmaps,hyperloglogs ,geospatial.并且支持对这些类型执行 原子操作 , 列如: int的增减,strings 的append,hashes hincrby,lists lpush,sets的交集sinter,并集union和差集sdiff命令. redis局限:在clu

基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验

学习了基于Scala的Actor之上的分布式并发消息驱动框架Akka初体验,应用actor模型,位置透明,做到高并发.可伸缩.容错.单机也可以用,水平扩展.垂直扩展.容错都有很好的表现,spark中的例子如下: private def initializeEventProcessActor(){ implicat val timeout=Timeout( 30 seconds) val initEventActorReply= dagSchedulerActorSupervisor ? Prop

MySQL数据库初体验(含MySQL数据库5.7.17手工编译安装)

MySQL数据库初体验 Ram:随机性访问存储器,断电丢失数据 内存Rom:只读访问存储器,不会丢失数据 管理存储的数据,数据的增删改查,数据的迁移,保证数据的私密性 1.数据库的基本概念2.数据库的发展3.主流的数据库介绍4.编译安装mysql5.操作mysql 数据库的基本概念 数据: 1.描述事物的符号记录称为数据(Data)2.包括数字,文字.图形.图像.声音.档案记录等3.以"记录"形式按统一-的格式进行存储 表: 1.将不同的记录组织在一-起,就形成了"表&quo

erlang 初体验

最近测试了一下 erlang的坑... 如不出意外.... 大家第一眼看到这语法... 心里第一句一定是"我擦.这TM都是啥!!!!!" 没有变量!!! 没有结构体!!! 没有循环!!! 好吧,至少我是这样想的. 找了半天..连个if也不知道怎么写.. 这记录一些基本常识.. -module(module_name)  %%定义模块 括号内的要和文件名相同. -export([fun1/1 fun2/2]) %%这里是导出2个函数对外使用  函数名/参数名. 一个简单的函数定义如下 f

linux初体验

第一次听到linux这个'词语'是在一次偶然的朋友聊天中朋友提到的,之前压根没听到过'这个东西',所以我可以说是个linux的新新手,菜鸟都不算. 截至到目前,我已经开始linux系统运维学习有差不多10天时间了.在没接触linux之前,我对它的认识仅仅是:它是个计算机系统.决定学习linux系统运维之前,自我以为运维应该是对系统的一些日常维护之类的,不会很难的东西,我更希望运维是个不难的东西,我个人很笨,对难的东西可能接受的很慢,所以我愿意认为运维是很简单的,这样我就可以轻轻松松的掌握运维相关

【Spark深入学习 -15】Spark Streaming前奏-Kafka初体验

----本节内容------- 1.Kafka基础概念 1.1 出世背景 1.2 基本原理 1.2.1.前置知识 1.2.2.架构和原理 1.2.3.基本概念 1.2.4.kafka特点 2.Kafka初体验 2.1 环境准备 2.2 Kafka小试牛刀 2.2.1单个broker初体验 2.2.2 多个broker初体验 2.3 Kafka分布式集群构建 2.3.1 Kafka分布式集群构建 2.3.2 Kafka主题创建 2.3.3 生产者生产数据 2.3.4消费者消费数据 2.3.5消息的

Java8初体验(二)Stream语法详解

原文链接:http://ifeve.com/stream/ 1. Stream初体验 我们先来看看Java里面是怎么定义Stream的: A sequence of elements supporting sequential and parallel aggregate operations. 我们来解读一下上面的那句话: Stream是元素的集合,这点让Stream看起来用些类似Iterator: 可以支持顺序和并行的对原Stream进行汇聚的操作: 大家可以把Stream当成一个高级版本的

hibernate--CRUD初体验

hibernate的crud操作初体验. 看具体实例 package com.fuwh.model; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.GeneratedValue; import javax.persistence.Id; import org.hibernate.annotations.GenericGenerator; @Entity publ

Oracle SQL篇(一)null值之初体验

    从我第一次正式的写sql语句到现在,已经超过10年的时间了.我写报表,做统计分析和财务对账,我一点点的接触oracle数据库,并尝试深入了解.这条路,一走就是10年,从充满热情,到开始厌倦,我不知道我还能坚持多久,未来的路,谁知道呢? 也许是该抓紧时间,做一点什么了,我不知道该开始写些什么,我从来没有在网上写东西的习惯.     先从简单的开始吧,那当然就是SQL,这是我SQL系列的第一篇,希望我能够坚持. 在Oracle数据库中,如果一个表中的列没有值的话,我们可以说是空值,比如IT员