Spark RPC使用记录(spark-2.2.0)

Spark RPC 使用说明

概述
        想通过 spark RPC 实现服务端则须实现
        ThreadSafeRpcEndpoint 或 RpcEndpoint
        一般通过实现前者来实现自己的服务,如同字面意思是线程安全的

        一般需要实现4个方法
            onStart                   服务启动时一些内部初始化和启动其他线程服务都在这里处理
            receive                   接收client发过来的请求,但是不需要回复
            receiveAndReply      接受client发过来的请求,并返回response
            onStop                   服务结束时需要做的一些清理动作在这里处理
Server端示例代码
package org.apache.spark

import java.text.SimpleDateFormat
import java.util.concurrent.ScheduledFuture
import java.util.Date
import java.util.concurrent.TimeUnit
import org.apache.spark.internal.Logging
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv, ThreadSafeRpcEndpoint}

/**
  * Created by cloud on 18/1/18.
  */
class ZsparkRpcServer(
                     override val rpcEnv: RpcEnv,
                     val conf : SparkConf
                     ) extends ThreadSafeRpcEndpoint with Logging{

  val scheduledThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("echo-thread")
  var scheduledThreadFuture : ScheduledFuture[_] = _

  override def onStart(): Unit = {
    scheduledThreadFuture = scheduledThread.scheduleWithFixedDelay(new Runnable {
      override def run(): Unit = {
        val simpleTime = new SimpleDateFormat("yy-MM-dd HH:mm:ss")
        logInfo(simpleTime.format(new Date()))
      }
    },3000L,2000L,TimeUnit.MILLISECONDS)
  }

  override def receive: PartialFunction[Any, Unit] = {
    case ZRequest(command) => logInfo(command.toUpperCase)
  }

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case ZRequest(command) => context.reply(ZResponse(command.reverse))
    case _ => context.reply(ZResponse("ECHO".reverse))
  }

  override def onStop(): Unit = {
    if(scheduledThreadFuture != null){
      scheduledThreadFuture.cancel(true)
    }
    scheduledThread.shutdownNow()
  }

}

object ZsparkRpcServer{
  val SN="z-cloud-echo"
  val EN="z-cloud-echo-ser"

  def main(args : Array[String]): Unit = {
    val conf = new SparkConf()
    val securityManager = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SN,Utils.localHostName(),23456,conf,securityManager)
    rpcEnv.setupEndpoint(EN,new ZsparkRpcServer(rpcEnv,conf))
    rpcEnv.awaitTermination()
  }
}

case class ZRequest(command : String)
case class ZResponse(result : String)
Client端示例代码

object ZsparkRpcClient{
  def main(args: Array[String]): Unit = {
    val host=Utils.localHostName()
    val port=23456
    val sparkConf = new SparkConf()
    val securityManager = new SecurityManager(sparkConf)
    val rpcEnv = RpcEnv.create(ZsparkRpcServer.SN,host,host,port,sparkConf,securityManager,true)
    val rpcEnvRef = rpcEnv.setupEndpointRef(RpcAddress(host,port),ZsparkRpcServer.EN)

    /*
    //异步获取response的方式
    val res=rpcEnvRef.ask[ZResponse](ZRequest("cli-echo"))
    res.onComplete {
      case Success(v) => println(v)
      case Failure(e) => println(e)
    }(ThreadUtils.sameThread)

    //发送不需要Server端回复的消息
    rpcEnvRef.send(ZRequest("non-response"))

    */

    /*
    * 同步获取response的方式
    * */
    for (i <- args){
      rpcEnvRef.send(ZRequest(i))
      val rpcTimeout=new RpcTimeout(FiniteDuration(3000L,TimeUnit.MILLISECONDS),"timeout")
      val res = rpcEnvRef.askSync[ZResponse](ZRequest(i),rpcTimeout)
      println(res.result)
    }

  }
}
启动RPC服务
#可以使用这种spark 封装脚本来执行,也可以自己构建执行环境
#使用spark封装脚本的好处就是处理简单可直接使用spark相关环境管理和配置管理,以及日志管理等等

if [ -z "${SPARK_HOME}" ]; then
  export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi

. "${SPARK_HOME}/sbin/spark-config.sh"
. "${SPARK_HOME}/bin/load-spark-env.sh"

exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.rpcDemo 1 "[email protected]"

原文地址:https://www.cnblogs.com/cloud-zhao/p/8409889.html

时间: 2024-11-06 03:41:14

Spark RPC使用记录(spark-2.2.0)的相关文章

spark RPC详解

前段时间看spark,看着迷迷糊糊的.最近终于有点头绪,先梳理了一下spark rpc相关的东西,先记录下来. 1,概述 个人认为,如果把分布式系统(HDFS, HBASE,SPARK等)比作一个人,那么RPC可以认为是人体的血液循环系统.它将系统中各个不同的组件(如Hbase中的master, Regionserver, client)联系了起来.同样,在spark中,不同组件像driver,executor,worker,master(stanalone模式)之间的通信也是基于RPC来实现的

Spark记录-Spark性能优化解决方案

Spark性能优化的10大问题及其解决方案 问题1:reduce task数目不合适解决方式:需根据实际情况调节默认配置,调整方式是修改参数spark.default.parallelism.通常,reduce数目设置为core数目的2到3倍.数量太大,造成很多小任务,增加启动任务的开销:数目太少,任务运行缓慢. 问题2:shuffle磁盘IO时间长解决方式:设置spark.local.dir为多个磁盘,并设置磁盘为IO速度快的磁盘,通过增加IO来优化shuffle性能: 问题3:map|red

Spark(五十三):Spark RPC初尝试使用

导入Maven依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.4.0</version> </dependency> 定义RPC Server端的ip(localhost).port(57992).服务名称(hello-rpc-service) obje

spark 运行问题记录

在CDH5.5.2上运行spark1.5的程序,运行起来就直接shutdown,并报出如下的异常: INFO YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)Exception in thread "main" java.lang.IllegalStateExce

Spark在StandAlone模式下提交任务,spark.rpc.message.maxSize太小而出错

1.错误信息org.apache.spark.SparkException: Job aborted due to stage failure:Serialized task 32:5 was 1728746673 bytes, which exceeds max allowed: spark.rpc.message.maxSize (134217728 bytes). Consider increasing spark.rpc.message.maxSize or using broadcas

Spark记录-Spark on Yarn框架

一.客户端进行操作 1.根据yarnConf来初始化yarnClient,并启动yarnClient2.创建客户端Application,并获取Application的ID,进一步判断集群中的资源是否满足executor和ApplicationMaster申请的资源,如果不满足则抛出IllegalArgumentException:3.设置资源.环境变量:其中包括了设置Application的Staging目录.准备本地资源(jar文件.log4j.properties).设置Applicati

Spark记录-Spark性能优化(开发、资源、数据、shuffle)

开发调优篇 原则一:避免创建重复的RDD 通常来说,我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD:接着对这个RDD执行某个算子操作,然后得到下一个RDD:以此类推,循环往复,直到计算出最终我们需要的结果.在这个过程中,多个RDD会通过不同的算子操作(比如map.reduce等)串起来,这个"RDD串",就是RDD lineage,也就是"RDD的血缘关系链". 我们在开发过程中要注意:对于同一份数据,只应该

spark使用性能优化记录

性能调优: 总则:加资源加并行度 简单直接,调节最优的资源配置 RDD架构和持久化 当可分配的资源无法达到更多的时候在考虑性能调优 从 重剑无锋 到 花拳绣腿 1.分配资源 并行度 RDD架构和缓存 2.shuffle调优 3.spark算子调优 4.JVM调优 . 广播大变量 分配哪些资源:executor(task--worker任务数)  cpu per  executor(每个作业的cpu核心数).memory (可以使用的内存).driver memory(影响不大) 在spark s

【Spark学习】Apache Spark安全机制

Spark版本:1.1.1 本文系从官方文档翻译而来,转载请尊重译者的工作,注明以下链接: http://www.cnblogs.com/zhangningbo/p/4135808.html 目录 Web UI 事件日志 网络安全(配置端口) 仅适用于Standalone模式的端口 适用于所有集群管理器的通用端口 现在,Spark支持通过共享秘钥进行认证.启用认证功能可以通过参数spark.authenticate来配置.此参数控制spark通信协议是否使用共享秘钥进行认证.这种认证方式基于握手