Spark(五十三):Spark RPC初尝试使用

导入Maven依赖

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>

定义RPC Server端的ip(localhost)。port(57992)、服务名称(hello-rpc-service)

object HelloRpcSettings {
  val rpcName = "hello-rpc-service"
  val port = 57992
  val hostname="localhost"

  def getName() = {
    rpcName
  }

  def getPort(): Int = {
    port
  }

  def getHostname():String={
    hostname
  }
}

定义RPC的Endpoint类和发送数据类SayHi/SayBye

case class SayHi(msg: String)

case class SayBye(msg: String)

import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}

class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
  override def onStart(): Unit = {
    println(rpcEnv.address)
    println("start hello endpoint")
  }

  override def receive: PartialFunction[Any, Unit] = {
    case SayHi(msg) =>
      println(s"receive $msg" )
  }

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case SayHi(msg) => {
      println(s"receive $msg")
      context.reply(s"hi, $msg")
    }
    case SayBye(msg) => {
      println(s"receive $msg")
      context.reply(s"bye, $msg")
    }
  }

  override def onStop(): Unit = {
    println("stop hello endpoint")
  }
}

定义RPC 服务提供者

import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.rpc._
import org.apache.spark.sql.SparkSession

object RpcServerTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    val sparkSession = SparkSession.builder().config(conf).master("local[*]").appName("test rpc").getOrCreate()
    val sparkContext: SparkContext = sparkSession.sparkContext
    val sparkEnv: SparkEnv = sparkContext.env

    val rpcEnv = RpcEnv.create(HelloRpcSettings.getName(), HelloRpcSettings.getHostname(), HelloRpcSettings.getHostname(), HelloRpcSettings.getPort(), conf,
      sparkEnv.securityManager, 1, false)

    val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
    rpcEnv.setupEndpoint(HelloRpcSettings.getName(), helloEndpoint)

    rpcEnv.awaitTermination()
  }
}

定义RPC服务使用者

import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcEnvConfig}
import org.apache.spark.sql.{Dataset, Row, SparkSession}

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object RpcClientTest {
  def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf()
    val sparkSession = SparkSession.builder().config(conf).master("local[*]").appName("test rpc").getOrCreate()
    val sparkContext: SparkContext = sparkSession.sparkContext
    val sparkEnv: SparkEnv = sparkContext.env

    val rpcEnv: RpcEnv = RpcEnv.create(HelloRpcSettings.getName(),HelloRpcSettings.getHostname(),HelloRpcSettings.getPort(),conf,sparkEnv.securityManager,false)
    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress(HelloRpcSettings.getHostname(), HelloRpcSettings.getPort()), HelloRpcSettings.getName())

    import scala.concurrent.ExecutionContext.Implicits.global

    endPointRef.send(SayHi("test send"))

    val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
    future.onComplete {
      case scala.util.Success(value) => println(s"Got the result = $value")
      case scala.util.Failure(e) => println(s"Got error: $e")
    }
    Await.result(future, Duration.apply("30s"))

    val res = endPointRef.askSync[String](SayBye("test askSync"))
    println(res)

    sparkSession.stop()
  }

}

启动RPC 服务提供者

Using Spark‘s default log4j profile: org/apache/spark/log4j-defaults.properties
19/06/28 14:50:12 INFO SparkContext: Running Spark version 2.4.0
19/06/28 14:50:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/06/28 14:50:12 INFO SparkContext: Submitted application: test rpc
19/06/28 14:50:12 INFO SecurityManager: Changing view acls to: boco
19/06/28 14:50:12 INFO SecurityManager: Changing modify acls to: boco
19/06/28 14:50:12 INFO SecurityManager: Changing view acls groups to:
19/06/28 14:50:12 INFO SecurityManager: Changing modify acls groups to:
19/06/28 14:50:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(boco); groups with view permissions: Set(); users  with modify permissions: Set(boco); groups with modify permissions: Set()
19/06/28 14:50:13 INFO Utils: Successfully started service ‘sparkDriver‘ on port 64621.
19/06/28 14:50:13 INFO SparkEnv: Registering MapOutputTracker
19/06/28 14:50:13 INFO SparkEnv: Registering BlockManagerMaster
19/06/28 14:50:13 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/06/28 14:50:13 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/06/28 14:50:13 INFO DiskBlockManager: Created local directory at C:\Users\boco\AppData\Local\Temp\blockmgr-7128dde8-9c46-4580-bb72-c2161ba65bf7
19/06/28 14:50:13 INFO MemoryStore: MemoryStore started with capacity 901.8 MB
19/06/28 14:50:13 INFO SparkEnv: Registering OutputCommitCoordinator
19/06/28 14:50:13 INFO Utils: Successfully started service ‘SparkUI‘ on port 4040.
19/06/28 14:50:13 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-JL4FSCV:4040
19/06/28 14:50:13 INFO Executor: Starting executor ID driver on host localhost
19/06/28 14:50:13 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService‘ on port 64642.
19/06/28 14:50:13 INFO NettyBlockTransferService: Server created on DESKTOP-JL4FSCV:64642
19/06/28 14:50:13 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/06/28 14:50:13 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
19/06/28 14:50:13 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-JL4FSCV:64642 with 901.8 MB RAM, BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
19/06/28 14:50:13 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
19/06/28 14:50:13 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
19/06/28 14:50:13 INFO Utils: Successfully started service ‘hello-rpc-service‘ on port 57992.
localhost:57992
start hello endpoint

启动RPC 服务使用者

Using Spark‘s default log4j profile: org/apache/spark/log4j-defaults.properties
19/06/28 14:53:53 INFO SparkContext: Running Spark version 2.4.0
19/06/28 14:53:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/06/28 14:53:54 INFO SparkContext: Submitted application: test rpc
19/06/28 14:53:54 INFO SecurityManager: Changing view acls to: boco
19/06/28 14:53:54 INFO SecurityManager: Changing modify acls to: boco
19/06/28 14:53:54 INFO SecurityManager: Changing view acls groups to:
19/06/28 14:53:54 INFO SecurityManager: Changing modify acls groups to:
19/06/28 14:53:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(boco); groups with view permissions: Set(); users  with modify permissions: Set(boco); groups with modify permissions: Set()
19/06/28 14:53:55 INFO Utils: Successfully started service ‘sparkDriver‘ on port 64818.
19/06/28 14:53:55 INFO SparkEnv: Registering MapOutputTracker
19/06/28 14:53:55 INFO SparkEnv: Registering BlockManagerMaster
19/06/28 14:53:55 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
19/06/28 14:53:55 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/06/28 14:53:55 INFO DiskBlockManager: Created local directory at C:\Users\boco\AppData\Local\Temp\blockmgr-6a0b8e7f-86d2-4bb8-b45c-7c04deabcb91
19/06/28 14:53:55 INFO MemoryStore: MemoryStore started with capacity 901.8 MB
19/06/28 14:53:55 INFO SparkEnv: Registering OutputCommitCoordinator
19/06/28 14:53:55 WARN Utils: Service ‘SparkUI‘ could not bind on port 4040. Attempting port 4041.
19/06/28 14:53:55 INFO Utils: Successfully started service ‘SparkUI‘ on port 4041.
19/06/28 14:53:55 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-JL4FSCV:4041
19/06/28 14:53:55 INFO Executor: Starting executor ID driver on host localhost
19/06/28 14:53:55 INFO Utils: Successfully started service ‘org.apache.spark.network.netty.NettyBlockTransferService‘ on port 64840.
19/06/28 14:53:55 INFO NettyBlockTransferService: Server created on DESKTOP-JL4FSCV:64840
19/06/28 14:53:55 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
19/06/28 14:53:55 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
19/06/28 14:53:55 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-JL4FSCV:64840 with 901.8 MB RAM, BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
19/06/28 14:53:55 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
19/06/28 14:53:55 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
19/06/28 14:53:55 WARN Utils: Service ‘hello-rpc-service‘ could not bind on port 57992. Attempting port 57993.
19/06/28 14:53:55 INFO Utils: Successfully started service ‘hello-rpc-service‘ on port 57993.
19/06/28 14:53:55 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:57992 after 31 ms (0 ms spent in bootstraps)
bye, test askSync
Got the result = hi, neo
19/06/28 14:53:55 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-JL4FSCV:4041
19/06/28 14:53:55 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/06/28 14:53:55 INFO MemoryStore: MemoryStore cleared
19/06/28 14:53:55 INFO BlockManager: BlockManager stopped
19/06/28 14:53:55 INFO BlockManagerMaster: BlockManagerMaster stopped
19/06/28 14:53:55 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/06/28 14:53:55 INFO SparkContext: Successfully stopped SparkContext
19/06/28 14:53:55 INFO ShutdownHookManager: Shutdown hook called
19/06/28 14:53:55 INFO ShutdownHookManager: Deleting directory 

此时 RPC 服务提供者打印信息如下:

receive test send
receive neo
receive test askSync
19/06/28 14:53:56 WARN TransportChannelHandler: Exception in connection from /127.0.0.1:64865
java.io.IOException: 远程主机强迫关闭了一个现有的连接。
    at sun.nio.ch.SocketDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)

原文地址:https://www.cnblogs.com/yy3b2007com/p/11104065.html

时间: 2024-10-09 06:41:08

Spark(五十三):Spark RPC初尝试使用的相关文章

如何为编程爱好者设计一款好玩的智能硬件(五)——初尝试&#183;把温湿度给收集了(中)!

一.我的构想:如何为编程爱好者设计一款好玩的智能硬件(一)——即插即用.积木化.功能重组的智能硬件模块构想 二.别人家的孩子:如何为编程爱好者设计一款好玩的智能硬件(二)——别人是如何设计硬件积木的! 三.MCU选型:如何为编程爱好者设计一款好玩的智能硬件(三)——该选什么样的MCU呢? 四.温湿度传感器DHT11驱动封装(上):如何为编程爱好者设计一款好玩的智能硬件(四)——初尝试·把温湿度给收集了(上)! 五.温湿度传感器DHT11驱动封装(中): 先打个预防针——本篇可能比较枯燥!与上一篇

Spark学习之路 (五)Spark伪分布式安装

讨论QQ:1586558083 目录 一.JDK的安装 1.1 上传安装包并解压 1.2 配置环境变量 1.3 验证Java版本 二.配置配置ssh localhost 2.1 检测 2.2 生成私钥和公钥秘钥对 2.3 将公钥添加到authorized_keys 2.4 赋予authorized_keys文件600的权限 2.5 修改Linux映射文件(root用户) 2.6 验证 三.安装Hadoop-2.7.5 3.1 上传解压缩 3.2 创建安装包对应的软连接 3.3 修改配置文件 3.

【Spark 内核】 Spark 内核解析-下

Spark内核泛指Spark的核心运行机制,包括Spark核心组件的运行机制.Spark任务调度机制.Spark内存管理机制.Spark核心功能的运行原理等,熟练掌握Spark内核原理,能够帮助我们更好地完成Spark代码设计,并能够帮助我们准确锁定项目运行过程中出现的问题的症结所在. Spark Shuffle 解析 Shuffle 的核心要点 ShuffleMapStage与ResultStage 在划分stage时,最后一个stage称为finalStage,它本质上是一个ResultSt

如何为编程爱好者设计一款好玩的智能硬件(六)——初尝试&#183;把温湿度给收集了(下)!

一.我的构想:如何为编程爱好者设计一款好玩的智能硬件(一)——即插即用.积木化.功能重组的智能硬件模块构想 二.别人家的孩子:如何为编程爱好者设计一款好玩的智能硬件(二)——别人是如何设计硬件积木的! 三.MCU选型:如何为编程爱好者设计一款好玩的智能硬件(三)——该选什么样的MCU呢? 四.温湿度传感器DHT11驱动封装(上):如何为编程爱好者设计一款好玩的智能硬件(四)——初尝试·把温湿度给收集了(上)! 五.温湿度传感器DHT11驱动封装(中):如何为编程爱好者设计一款好玩的智能硬件(五)

R语言爬虫初尝试-基于RVEST包学习

R语言爬虫初尝试-基于RVEST包学习 Thursday, February 26, 2015 在学完coursera的getting and Cleaning data后,继续学习用R弄爬虫网络爬虫.主要用的还是Hadley Wickham开发的rvest包.再次给这位矜矜业业开发各种好用的R包的大神奉上膝盖 查阅资料如下: rvest的github rvest自身的帮助文档 rvest + CSS Selector 网页数据抓取的最佳选择-戴申: 里面有提及如何快速获得html的位置.看完这

Spark官方文档: Spark Configuration(Spark配置)

Spark官方文档: Spark Configuration(Spark配置) Spark主要提供三种位置配置系统: 环境变量:用来启动Spark workers,可以设置在你的驱动程序或者conf/spark-env.sh 脚本中: java系统性能:可以控制内部的配置参数,两种设置方法: 编程的方式(程序中在创建SparkContext之前,使用System.setProperty("xx","xxx")语句设置相应系统属性值): 在conf/spark-env

【Spark篇】---Spark调优之代码调优,数据本地化调优,内存调优,SparkShuffle调优,Executor的堆外内存调优

一.前述 Spark中调优大致分为以下几种 ,代码调优,数据本地化,内存调优,SparkShuffle调优,调节Executor的堆外内存. 二.具体    1.代码调优 1.避免创建重复的RDD,尽量使用同一个RDD 2.对多次使用的RDD进行持久化 如何选择一种最合适的持久化策略? 默认情况下,性能最高的当然是MEMORY_ONLY,但前提是你的内存必须足够足够大,可以绰绰有余地存放下整个RDD的所有数据.因为不进行序列化与反序列化操作,就避免了这部分的性能开销:对这个RDD的后续算子操作,

Spark学习(一) Spark初识

一.官网介绍 1.什么是Spark 官网地址:http://spark.apache.org/ Apache Spark™是用于大规模数据处理的统一分析引擎. 从右侧最后一条新闻看,Spark也用于AI人工智能 spark是一个实现快速通用的集群计算平台.它是由加州大学伯克利分校AMP实验室 开发的通用内存并行计算框架,用来构建大型的.低延迟的数据分析应用程序.它扩展了广泛使用的MapReduce计算 模型.高效的支撑更多计算模式,包括交互式查询和流处理.spark的一个主要特点是能够在内存中进

Spark学习笔记——Spark Streaming

许多应用需要即时处理收到的数据,例如用来实时追踪页面访问统计的应用.训练机器学习模型的应用, 还有自动检测异常的应用.Spark Streaming 是 Spark 为这些应用而设计的模型.它允许用户使用一套和批处理非常接近的 API 来编写流式计算应用,这样就可以大量重用批处理应用的技术甚至代码. Spark Streaming 使用离散化流( discretized stream)作为抽象表示, 叫作 DStream. DStream 是随时间推移而收到的数据的序列.在内部,每个时间区间收到