第43课:Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等

Spark 是分布式计算框架,多台机器之间必然存在着通信。Spark在早期版本采用Akka实现。现在在Akka的上层抽象出了一个RpcEnv。RpcEnv负责管理机器之间的通信。

RpcEnv包含了如下三大核心:

  • RpcEndpoint 消息循环体,负责接收并处理消息。Spark中的Master、Worker都是RpcEndpoint 。
  • RpcEndpointRef :RpcEndpoint的引用,如果需要和RpcEndpoint通信,就必须获取它的RpcEndpointRef,通过RpcEndpointRef发送消息。
  • Dispatcher:消息调度器,负责RPC消息路由到适当的RpcEndpoint。

RpcEnv被创建以后,RpcEndpoint可以注册到RpcEnv中,被注册的RpcEndpoint会生成一个相应的RpcEndpointRef来引用它。如果你需要向RpcEndpoint发送消息,必须到RpcEnv中通过RpcEndpoint的名称来获取对应的RpcEndpointRef,然后通过RpcEndpointRef向RpcEndpoint发送消息。

RpcEnv负责管理RpcEndpoint的整个生命周期

  • 注册RpcEndpoint,使用name或者uri
  • 路由发送给RpcEndpoint的消息。
  • 停止RpcEndpoint

注:一个RpcEndpoint只能注册给一个RpcEnv

RpcAddress:RpcEnv的逻辑地址,使用主机名和端口表示。

RpcEndpointAddress:注册到RpcEnv上的RpcEndpoint的地址,由RpcAddress和name构成。

由此可见RpcEnv和RpcEndpoint是在相同的机器上(相同的JVM中)。而要想给远端机器发送消息,是获取远端机器的RpcEndpointRef,而并不是远端的RpcEndpoint注册到本地的RpcEnv中。

在Spark1.6版本中,默认使用的是netty

private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
  val rpcEnvNames = Map(
    "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
    "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
  val rpcEnvName = conf.get("spark.rpc", "netty")
  val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
  Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
}

RpcEndpoint是一个消息循环体,它的生命周期:

构造(Constructor)->启动(onStart)->消息接收(receive&receiveAndReply)->停止(onStop)

receive():不断的运行,处理客户端发送过来的消息。

receiveAndReply():处理消息,并且回应对方。

我们看一下Master的代码:

def main(argStrings: Array[String]) {
  SignalLogger.register(log)
  val conf = new SparkConf
  val args = new MasterArguments(argStrings, conf)
  //指定的主机名必须是start-master.sh脚本运行的本地机器名称
  val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
  rpcEnv.awaitTermination()
}

/**
 * Start the Master and return a three tuple of:
 *   (1) The Master RpcEnv
 *   (2) The web UI bound port
 *   (3) The REST server bound port, if any
 */
def startRpcEnvAndEndpoint(
    host: String,
    port: Int,
    webUiPort: Int,
    conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
  val securityMgr = new SecurityManager(conf)
  //创建Rpc环境,主机名和端口就是Standalone集群的访问地址。SYSTEM_NAME=sparkMaster
  val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
  // 将Master实例注册到RpcEnv中
  val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
    new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
  val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
  (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
}

在main方法中创建了RpcEnv,并且实例化Master实例,然后注册到RpcEnv中。

RpcEndpoint其实是注册到Dispatcher中的,在netty中的代码实现如下:

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
  dispatcher.registerRpcEndpoint(name, endpoint)
}

注:NettyRpcEnv.scala的第135行

而Dispatcher中使用如下数据结构来存储RpcEndpoint和RpcEndpointRef

private val endpoints = new ConcurrentHashMap[String, EndpointData]
private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]

EndpointData为一个case class:

private class EndpointData(
    val name: String,
    val endpoint: RpcEndpoint,
    val ref: NettyRpcEndpointRef) {
  val inbox = new Inbox(ref, endpoint)
}

在Master中使用数据结构WorkerInfo保存着每个Worker的信息,其中就包括每个Worker的RpcEndpointRef

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

时间: 2024-10-22 05:58:40

第43课:Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等的相关文章

Spark 1.6 RPC内幕解密:运行机制、源码详解、Netty与Akka等(DT大数据梦工厂)

内容: 1.Spark 1.6 RPC解析: 2.RPCEnv源码解析: 3.RPCEndpoint等源码解析: 以前和现在的RPC都是采用Akka,以前和现在的不同就在于RPCEnv,现在就是基于RPCEnv去做RPC通信的 ==========Spark 1.6 RPC解析============ 1.Spark 1.6推出了以RPCEnv.RPCEndpoint.RPCEndpointRef为核心的新型架构下的RPC通信方式,就目前的实现而言,其底层依旧是Akka: 2.Akka是基于Sc

Spark Streaming 源码详解

原地址 本系列内容适用范围: * 2015.12.05 update, Spark 1.6 全系列 √ (1.6.0-preview,尚未正式发布) * 2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2) * 2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1) * 2015.04.17 update, Spark 1.3 全系列 √ (1.3.0, 1.3.1) 概述 0.1 Spark

[Spark內核] 第42课:Spark Broadcast内幕解密:Broadcast运行机制彻底解密、Broadcast源码解析、Broadcast最佳实践

本课主题 Broadcast 运行原理图 Broadcast 源码解析 Broadcast 运行原理图 Broadcast 就是将数据从一个节点发送到其他的节点上; 例如 Driver 上有一张表,而 Executor 中的每个并行执行的Task (100万个Task) 都要查询这张表的话,那我们通过 Broadcast 的方式就只需要往每个Executor 把这张表发送一次就行了,Executor 中的每个运行的 Task 查询这张唯一的表,而不是每次执行的时候都从 Driver 中获得这张表

[Spark传奇行动] 第34课:Stage划分和Task最佳位置算法源码彻底解密

本課主題 Job Stage 划分算法解密 Task 最佳位置算法實現解密 引言 作业调度的划分算法以及 Task 的最佳位置的算法,因为 Stage 的划分是DAGScheduler 工作的核心,这也是关系到整个作业有集群中该怎么运行:其次就是数据本地性,Spark 一舨的代码都是链式表达的,这就让一个任务什么时候划分成 Stage,在大数据世界要追求最大化的数据本地性,所有最大化的数据本地性就是在数据计算的时候,数据就在内存中.最后就是 Spark 的实现算法时候的略的怎么样.希望这篇文章能

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法

Spark资源调度机制源码分析--基于spreadOutApps及非spreadOutApps两种资源调度算法 1.spreadOutApp尽量平均分配到每个executor上: 2.非spreadOutApp尽量在使用单个executor的资源. 源码分析 org.apache.spark.deploy.master.Master 1.首先判断,master状态不是ALIVE的话,直接返回2.调度driver3. Application的调度机制(核心之核心,重中之重) 源码如下: 1 /*

下载-深入浅出Netty源码剖析、Netty实战高性能分布式RPC、NIO+Netty5各种RPC架构实战演练三部曲视频教程

下载-深入浅出Netty源码剖析.Netty实战高性能分布式RPC.NIO+Netty5各种RPC架构实战演练三部曲视频教程 第一部分:入浅出Netty源码剖析 第二部分:Netty实战高性能分布式RPC 第三部分:NIO+Netty5各种RPC架构实战演练

Spark学习三:Spark Schedule以及idea的安装和导入源码

Spark学习三:Spark Schedule以及idea的安装和导入源码 标签(空格分隔): Spark Spark学习三Spark Schedule以及idea的安装和导入源码 一RDD操作过程中的数据位置 二Spark Schedule 三Idea导入spark源码 一,RDD操作过程中的数据位置 [hadoop001@xingyunfei001 spark-1.3.0-bin-2.5.0]$ bin/spark-shell --master local[2] val rdd = sc.t

Spark大师之路:广播变量(Broadcast)源码分析

概述 最近工作上忙死了--广播变量这一块其实早就看过了,一直没有贴出来. 本文基于Spark 1.0源码分析,主要探讨广播变量的初始化.创建.读取以及清除. 类关系 BroadcastManager类中包含一个BroadcastFactory对象的引用.大部分操作通过调用BroadcastFactory中的方法来实现. BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory.HttpBroadcastFactory.这两个子类实现了对Htt

Scala 深入浅出实战经典 第41讲:List继承体系实现内幕和方法操作源码揭秘

package com.parllay.scala.dataset /** * Created by richard on 15-7-25. * 第41讲:List继承体系实现内幕和方法操作源码揭秘 */object List_Interal { def main(args: Array[String]) { /** * List: 继承体系: * list有两个子类 Nil, ::, 他们都实现了 * override def head : B = hd override def tail :