SPARK如何使用AKKA实现进程、节点通信

SPARK如何使用AKKA实现进程、节点通信

《深入理解Spark:核心思想与源码分析》一书前言的内容请看链接《深入理解SPARK:核心思想与源码分析》一书正式出版上市

《深入理解Spark:核心思想与源码分析》一书第一章的内容请看链接《第1章 环境准备》

《深入理解Spark:核心思想与源码分析》一书第二章的内容请看链接《第2章 SPARK设计理念与基本架构》

《深入理解Spark:核心思想与源码分析》一书第三章第一部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(伯篇)》

《深入理解Spark:核心思想与源码分析》一书第三章第二部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(仲篇)》

《深入理解Spark:核心思想与源码分析》一书第三章第三部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(叔篇)》

《深入理解Spark:核心思想与源码分析》一书第三章第四部分的内容请看链接《深入理解Spark:核心思想与源码分析》——SparkContext的初始化(季篇)》

AKKA简介

  Scala认为Java线程通过共享数据以及通过锁来维护共享数据的一致性是糟糕的做法,容易引起锁的争用,而且线程的上下文切换会带来不少开销,降低并发程序的性能,甚至会引入死锁的问题。在Scala中只需要自定义类型继承Actor,并且提供act方法,就如同Java里实现Runnable接口,需要实现run方法一样。但是不能直接调用act方法,而是通过发送消息的方式(Scala发送消息是异步的),传递数据。如:
Actor ! message
Akka是Actor编程模型的高级类库,类似于JDK 1.5之后越来越丰富的并发工具包,简化了程序员并发编程的难度。Akka是一款提供了用于构建高并发的、分布式的、可伸缩的、基于Java虚拟机的消息驱动应用的工具集和运行时环境。从下面Akka官网提供的一段代码示例,可以看出Akka并发编程的简约。

case class Greeting(who: String)
class GreetingActor extends Actor with ActorLogging {
  def receive = {
    case Greeting(who) ⇒ log.info("Hello " + who)
  }
}
val system = ActorSystem("MySystem")
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
greeter ! Greeting("Charlie Parker")

Akka提供了分布式的框架,意味着用户不需要考虑如何实现分布式部署,Akka官网提供了下面的示例演示如何获取远程Actor的引用。

// config on all machines
akka {
  actor {
    provider = akka.remote.RemoteActorRefProvider
    deployment {
      /greeter {
        remote = akka.tcp://[email protected]:2552
      }
    }
  }
}
// ------------------------------
// define the greeting actor and the greeting message
case class Greeting(who: String) extends Serializable
class GreetingActor extends Actor with ActorLogging {
  def receive = {
    case Greeting(who) ⇒ log.info("Hello " + who)
  }
}
// ------------------------------
// on machine 1: empty system, target for deployment from machine 2
val system = ActorSystem("MySystem")
// ------------------------------
// on machine 2: Remote Deployment - deploying on machine1
val system = ActorSystem("MySystem")
val greeter = system.actorOf(Props[GreetingActor], name = "greeter")
// ------------------------------
// on machine 3: Remote Lookup (logical home of “greeter” is machine2, remote deployment is transparent)
val system = ActorSystem("MySystem")
val greeter = system.actorSelection("akka.tcp://[email protected]:2552/user/greeter")
greeter ! Greeting("Sonny Rollins")

Actor之间最终会构成一棵树,作为父亲的Actor应当对所有儿子的异常失败进行处理(监管)Akka给出了简单的示例,代码如下。

class Supervisor extends Actor {
  override val supervisorStrategy =
  OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) {
    case _: ArithmeticException ⇒ Resume
    case _: NullPointerException ⇒ Restart
    case _: Exception ⇒ Escalate
  }
  val worker = context.actorOf(Props[Worker])
  def receive = {
    case n: Int => worker forward n
  }
}

Akka的更多信息请访问官方网站:http://akka.io/

基于AKKA的分布式消息系统ACTORSYSTEM

  Spark使用Akka提供的消息系统实现并发:ActorSystem是Spark中最基础的设施,Spark既使用它发送分布式消息,又用它实现并发编程。正是因为Actor轻量级的并发编程、消息发送以及ActorSystem支持分布式消息发送等特点,Spark选择了ActorSystem。
SparkEnv中创建ActorSystem时用到了AkkaUtils工具类,代码如下。

val (actorSystem, boundPort) =
Option(defaultActorSystem) match {
  case Some(as) => (as, port)
  case None =>
    val actorSystemName = if (isDriver) driverActorSystemName else executorActorSystemName
    AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)
}

AkkaUtils.createActorSystem方法用于启动ActorSystem,代码如下。

def createActorSystem(
  name: String,
  host: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager): (ActorSystem, Int) = {
  val startService: Int => (ActorSystem, Int) = { actualPort =>
    doCreateActorSystem(name, host, actualPort, conf, securityManager)
  }
  Utils.startServiceOnPort(port, startService, conf, name)
}

AkkaUtils使用了Utils的静态方法startServiceOnPort, startServiceOnPort最终会回调方法startService: Int=> (T, Int),此处的startService实际是方法doCreateActorSystem。真正启动ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具体实现细节请见AkkaUtils的详细介绍。关于startServiceOnPort的实现,请参阅[《Spark中常用工具类Utils的简明介绍》](http://blog.csdn.net/beliefer/article/details/50904662)一文的内容。

AKKAUTILS

  AkkaUtils是Spark对Akka相关API的又一层封装,这里对其常用的功能进行介绍。

(1)doCreateActorSystem

功能描述:创建ActorSystem。

private def doCreateActorSystem(
  name: String,
  host: String,
  port: Int,
  conf: SparkConf,
  securityManager: SecurityManager): (ActorSystem, Int) = {

  val akkaThreads = conf.getInt("spark.akka.threads", 4)
  val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15)
  val akkaTimeout = conf.getInt("spark.akka.timeout", 100)
  val akkaFrameSize = maxFrameSizeBytes(conf)
  val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false)
  val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off"
  if (!akkaLogLifecycleEvents) {
    Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL))
  }
  val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off"
  val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 6000)
  val akkaFailureDetector =
    conf.getDouble("spark.akka.failure-detector.threshold", 300.0)
  val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000)
  val secretKey = securityManager.getSecretKey()
  val isAuthOn = securityManager.isAuthenticationEnabled()
  if (isAuthOn && secretKey == null) {
    throw new Exception("Secret key is null with authentication on")
  }
  val requireCookie = if (isAuthOn) "on" else "off"
  val secureCookie = if (isAuthOn) secretKey else ""
  logDebug("In createActorSystem, requireCookie is: " + requireCookie)
  val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback(
    ConfigFactory.parseString(
    s"""
    |akka.daemonic = on
    |akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
    |akka.stdout-loglevel = "ERROR"
    |akka.jvm-exit-on-fatal-error = off
    |akka.remote.require-cookie = "$requireCookie"
    |akka.remote.secure-cookie = "$secureCookie"
    |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
    |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
    |akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
    |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
    |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
    |akka.remote.netty.tcp.hostname = "$host"
    |akka.remote.netty.tcp.port = $port
    |akka.remote.netty.tcp.tcp-nodelay = on
    |akka.remote.netty.tcp.connection-timeout = $akkaTimeout s
    |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}B
    |akka.remote.netty.tcp.execution-pool-size = $akkaThreads
    |akka.actor.default-dispatcher.throughput = $akkaBatchSize
    |akka.log-config-on-start = $logAkkaConfig
    |akka.remote.log-remote-lifecycle-events = $lifecycleEvents
    |akka.log-dead-letters = $lifecycleEvents
    |akka.log-dead-letters-during-shutdown = $lifecycleEvents
    """.stripMargin))
  val actorSystem = ActorSystem(name, akkaConf)
  val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider
  val boundPort = provider.getDefaultAddress.port.get
  (actorSystem, boundPort)
}

(2)makeDriverRef

功能描述:从远端ActorSystem中查找已经注册的某个Actor。

def makeDriverRef(name: String, conf: SparkConf, actorSystem: ActorSystem): ActorRef = {
  val driverActorSystemName = SparkEnv.driverActorSystemName
  val driverHost: String = conf.get("spark.driver.host", "localhost")
  val driverPort: Int = conf.getInt("spark.driver.port", 7077)
  Utils.checkHost(driverHost, "Expected hostname")
  val url = s"akka.tcp://[email protected]$driverHost:$driverPort/user/$name"
  val timeout = AkkaUtils.lookupTimeout(conf)
  logInfo(s"Connecting to $name: $url")
  Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)
}

时间: 2024-08-09 13:42:40

SPARK如何使用AKKA实现进程、节点通信的相关文章

hadoop+Spark+hbase集群动态增加节点

分布式系统的一个优势就是动态可伸缩性,如果增删节点需要重启那肯定是不行的.后来研究了一下,发现的确是不需要重启集群,直接在新增的节点上分别启动以下进程即可:以hadoop.spark和hbase为例: 一.hadoop增加datanode节点 因为1.x版本和2.x版本有比较大的差异,我这里是以2.7为例.在namenode节点上,将hadoop-2.7复制到新节点上,并在新节点上删除data和logs目录中的文件. 1.增加hdfs数据节点datanode 在此节点上启动hdfs: ./sbi

进程与进程之间通信Manager

1 #!/usr/bin/env python 2 from multiprocessing import Process,Manager 3 4 #Manager进程与进程之间通信 5 def Foo(i,dic): 6 dic[i] = 100+i 7 print(dic.values()) 8 if __name__ == '__main__': 9 manage = Manager() 10 dic = manage.dict() 11 for i in range(2): 12 p =

linux c 进程 pipe 通信代码分析

[[email protected] 04]# cat ex04-3-pipe02.c #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/types.h> int main(void){ int result=-1; int fd[2],nbytes; pid_t pid; char string[]="

erlang在windows下和虚拟机节点通信

在Linux下部署erlang项目,开发过程很多都是在Windows完成的,然后再发布到Linux,所以测试过程要在虚拟机下完成.有一天因为想要在虚拟机中使用到erlang图形化工具,比如appmon.tv.observer等等,便突发奇想得利用Windows的erlang连接到虚拟机中使用这些工具,来查看虚拟机的运行状态. 需要准备哪些东西? 我是在VmWare10虚拟Centos6.5系统,所以这里以VmWare10和Centos6.5为例,其他Linux系统及工具只做参考. 注意了,Lin

android基础部分再学习---再谈Service进程服务通信

Bound Services 快速查看 bound服务允许被其它控件绑定,以便与之交互并进行进程间通信 一旦所有的客户端都解除了绑定,bound服务将被销毁.除非该服务同时又是started类型的. 在本文中(参见目录) 关键类 Service ServiceConnection IBinder 范例 RemoteService LocalService bound服务是客户端-服务器模式的服务.bound服务允许组件(比如activity)对其进行绑定.发送请求.接收响应.甚至进行进程间通信(

Linux进程为什么通信和主要通信手段

为什么要进程需要通信? 1.数据共享:一个进程需要将它的数据发给另一个进程. 2.资源共享:多个进程之间共享同样的资源. 3.通知事件:一个进程需要向另一个或一组进程发送消息,通知它们发生了某种事件. 4.进程控制:有些进程希望完全控制另一个进程的执行,此时控制进程希望能够拦截另一个进程的所有操作.并能够及时知道它的状态改变. Linux进程通信(IPC)由以下几部分发展而来: 1.UNIX进程间通信 2.基于SystemV进程间通信 3.POSIX进程间通信 Linux下进程间通信的几种主要手

ROS与Arduino学习(七)小案例节点通信

ROS与Arduino学习(七)小案例节点通信 Tutorial Level:Logging日志 Next Tutorial: 原文地址:https://www.cnblogs.com/flyingjun/p/8952857.html

Linux使用定时器timerfd 和 eventfd接口实现进程线程通信

body, table{font-family: 微软雅黑; font-size: 13.5pt} table{border-collapse: collapse; border: solid gray; border-width: 2px 0 2px 0;} th{border: 1px solid gray; padding: 4px; background-color: #DDD;} td{border: 1px solid gray; padding: 4px;} tr:nth-chil

IPC进程之间通信的几种方式

概念 进程间通信就是在不同进程之间传播或交换信息,那么不同进程之间存在着什么双方都可以访问的介质呢?进程的用户空间是互相独立的,一般而言是不能互相访问的,唯一的例外是 共享内存区 .但是,系统空间却是“公共场所”,所以内核显然可以提供这样的条件. 除此以外,那就是双方都可以访问的 外设 了.在这个意义上,两个进程当然也可以通过磁盘上的普通文件交换信息,或者通过“注册表”或其它数据库中的某些表项和记录交换信息.广义上这也是进程间通信的手段,但是一般都不把这算作“进程间通信”.因为那些通信手段的效率