StreamingListener技术点

以下是对StreamingListene的研究,由于比较简单,故只贴代码,不做解释

/**
  * Created by gabry.wu on 2016/5/27.
  * 实现StreamingListener,以监控spark作业状态  * 传入StreamingContext可以在某种出错时退出当前的SparkStreaming
  */
class StreamingMonitor(ssc:StreamingContext) extends StreamingListener{
  private val log =  LoggerFactory.getLogger("SparkStreamingMonitor")
  // Receiver启动
  override def onReceiverStarted(receiverStarted : StreamingListenerReceiverStarted): Unit = {
    log.warn("onReceiverStarted")
    log.warn(s"active=${receiverStarted.receiverInfo.active},executorId=${receiverStarted.receiverInfo.executorId}," +
      s"lastError=${receiverStarted.receiverInfo.lastError},lastErrorMessage=${receiverStarted.receiverInfo.lastErrorMessage}," +
      s"location=${receiverStarted.receiverInfo.location},name=${receiverStarted.receiverInfo.name}," +
      s"streamId=${receiverStarted.receiverInfo.streamId}")
  }
  // Receiver报错
  override def onReceiverError(receiverError : StreamingListenerReceiverError): Unit = {
    log.warn("onReceiverError")
    //可在该函数处理Receiver失败
    log.warn(s"active=${receiverError.receiverInfo.active},executorId=${receiverError.receiverInfo.executorId}," +
      s"lastError=${receiverError.receiverInfo.lastError},lastErrorMessage=${receiverError.receiverInfo.lastErrorMessage}," +
      s"location=${receiverError.receiverInfo.location},name=${receiverError.receiverInfo.name}," +
      s"streamId=${receiverError.receiverInfo.streamId}")
  }
  // Receiver停止
  override def onReceiverStopped(receiverStopped : StreamingListenerReceiverStopped): Unit = {
    log.warn("onReceiverStopped")
    log.warn(s"active=${receiverStopped.receiverInfo.active},executorId=${receiverStopped.receiverInfo.executorId}," +
      s"lastError=${receiverStopped.receiverInfo.lastError},lastErrorMessage=${receiverStopped.receiverInfo.lastErrorMessage}," +
      s"location=${receiverStopped.receiverInfo.location},name=${receiverStopped.receiverInfo.name}," +
      s"streamId=${receiverStopped.receiverInfo.streamId}")
  }
  // Batch提交作业
  override def onBatchSubmitted(batchSubmitted : StreamingListenerBatchSubmitted): Unit = {
    log.warn("onBatchSubmitted")
    // 提交作业之前已经知道有多少数据
    // batchSubmitted.batchInfo.numRecords是此次batch的数据量
    log.warn(s"batchTime=${batchSubmitted.batchInfo.batchTime},numRecords=${batchSubmitted.batchInfo.numRecords}," +
      s"processingDelay=${batchSubmitted.batchInfo.processingDelay},processingEndTime=${batchSubmitted.batchInfo.processingEndTime}," +
      s"processingStartTime=${batchSubmitted.batchInfo.processingStartTime},schedulingDelay=${batchSubmitted.batchInfo.schedulingDelay}," +
      s"submissionTime=${batchSubmitted.batchInfo.submissionTime},totalDelay=${batchSubmitted.batchInfo.totalDelay}")
  }
  // Batch启动
  override def onBatchStarted(batchStarted : StreamingListenerBatchStarted): Unit = {
    log.warn("onBatchStarted")
    //batchStarted.batchInfo.schedulingDelay:从提交到正式启动batch的间隔时间
    log.warn(s"batchTime=${batchStarted.batchInfo.batchTime},numRecords=${batchStarted.batchInfo.numRecords}," +
      s"processingDelay=${batchStarted.batchInfo.processingDelay},processingEndTime=${batchStarted.batchInfo.processingEndTime}," +
      s"processingStartTime=${batchStarted.batchInfo.processingStartTime},schedulingDelay=${batchStarted.batchInfo.schedulingDelay}," +
      s"submissionTime=${batchStarted.batchInfo.submissionTime},totalDelay=${batchStarted.batchInfo.totalDelay}")
  }
  // Batch完成
  override def onBatchCompleted(batchCompleted : StreamingListenerBatchCompleted): Unit = {
    log.warn("onBatchCompleted")
    //batchCompleted.batchInfo.processingDelay:批量处理时间
    //batchCompleted.batchInfo.totalDelay:此次批处理从提交,到最后结束总耗时
    log.warn(s"batchTime=${batchCompleted.batchInfo.batchTime},numRecords=${batchCompleted.batchInfo.numRecords}," +
      s"processingDelay=${batchCompleted.batchInfo.processingDelay},processingEndTime=${batchCompleted.batchInfo.processingEndTime}," +
      s"processingStartTime=${batchCompleted.batchInfo.processingStartTime},schedulingDelay=${batchCompleted.batchInfo.schedulingDelay}," +
      s"submissionTime=${batchCompleted.batchInfo.submissionTime},totalDelay=${batchCompleted.batchInfo.totalDelay}")
  }
  // 输出操作开始
  override def onOutputOperationStarted(outputOperationStarted : StreamingListenerOutputOperationStarted): Unit = {
    log.warn("onOutputOperationStarted")
    //outputOperationStarted.outputOperationInfo.description:其实就是Stack的部分信息,可用于输出Action的定位
    //outputOperationStarted.outputOperationInfo.name:Action的函数名称
    log.warn(s"batchTime=${outputOperationStarted.outputOperationInfo.batchTime},description=${outputOperationStarted.outputOperationInfo.description}," +
      s"duration=${outputOperationStarted.outputOperationInfo.duration},endTime=${outputOperationStarted.outputOperationInfo.endTime}," +
      s"failureReason=${outputOperationStarted.outputOperationInfo.failureReason},id=${outputOperationStarted.outputOperationInfo.id}," +
      s"name=${outputOperationStarted.outputOperationInfo.name},startTime=${outputOperationStarted.outputOperationInfo.startTime}")
  }
  // 输出操作完成
  override def onOutputOperationCompleted(outputOperationCompleted : StreamingListenerOutputOperationCompleted): Unit = {
    log.warn("onOutputOperationCompleted")
    //outputOperationCompleted.outputOperationInfo.duration:Action的耗时
    //outputOperationCompleted.outputOperationInfo.failureReason:Action失败的原因。可以在该函数中处理Batch失败
    log.warn(s"batchTime=${outputOperationCompleted.outputOperationInfo.batchTime},description=${outputOperationCompleted.outputOperationInfo.description}," +
      s"duration=${outputOperationCompleted.outputOperationInfo.duration},endTime=${outputOperationCompleted.outputOperationInfo.endTime}," +
      s"failureReason=${outputOperationCompleted.outputOperationInfo.failureReason},id=${outputOperationCompleted.outputOperationInfo.id}," +
      s"name=${outputOperationCompleted.outputOperationInfo.name},startTime=${outputOperationCompleted.outputOperationInfo.startTime}")
  }
}

下面是添加StreamingListene的代码

val ssc = new StreamingContext(sparkConf, new Duration(batchDuration))
    ssc.addStreamingListener(new StreamingMonitor(ssc))

  

各个函数的调用顺序

onReceiverStarted->[接收到数据]->onBatchSubmitted->onBatchStarted->onOutputOperationStarted->onOutputOperationCompleted->onBatchCompleted->[接收到数据]->onBatchSubmitted->onBatchStarted->onOutputOperationStarted->onOutputOperationCompleted->onBatchCompleted->.......->onReceiverStopped

其中[接收到数据]是可选项,并不是每次都会接收到数据。

时间: 2024-10-14 03:01:11

StreamingListener技术点的相关文章

静态网页开发技术-HTML

今天我重新复习了一下静态网页开发技术,概括如下. 一 .HTML文档结构与基本语法 :放置了标签的文本文档,可供浏览器解释执行的网页文件 1.注释标记 2.标记 3.属性 二.基本标记与使用 1.网页基本结构与标记 2.文本与段落标记 3.列表标签 4.超链接标签 5.图片标记 6.定时刷新或跳转 7.表格 三 HTML表单标签与表单设计 1.<FORM>标记及其属性 2  <INPUT>标记及其属性 3 <下拉列表框<SELECT>,<OPTION>

C#网络编程技术FastSocket实战项目演练

一.FastSocket课程介绍 .NET框架虽然微软提供了socket通信的类库,但是还有很多事情要自己处理,比如TCP协议需要处理分包.组包.粘包.维护连接列表等,UDP协议需要处理丢包.乱序,而且对于多连接并发,还要自己处理多线程等等.本期分享课程阿笨给大家带来的是来源于github开源Socket通信中间件:FastSocket,目的就是把大家从繁琐的网络编程技术中彻底地解放和释放出来. 阿笨只想安安静静的学习下网络编程技术Socket后,将学习的成果直接灵活的运用到自己的实际项目中去.

Java技术的特点

Java技术是一套完整的IT行业解决方案,其中包含了很多技术.最初是从解决家电设备联网通讯的方案发展起来的,其特点适用于Internet,于是在Internet广泛应用的环境下,迅速发展成为一种计算机语言.一个平台.一个网络计算的架构. 从整体上划分,Java技术可分成Java编程语言和Java平台.Java编程语言是一种高级编程语言,Java平台是指程序运行的硬件或软件环境. Java编程语言的特征: 1.简单易用 2.面向对象 3.跨平台,可移植 4.多线程 5.健壮性 6.安全性 7.动态

谈谈-Android中的接口回调技术

Android中的接口回调技术有很多应用的场景,最常见的:Activity(人机交互的端口)的UI界面中定义了Button,点击该Button时,执行某个逻辑. 下面参见上述执行的模型,讲述James对Android接口回调技术的理解(结合前人的知识和自己的实践). 使用一个比喻很形象地说明:客户端有个疑问打电话请教服务端,但服务端无法现场给出解答,相互之间约定:服务端一旦有答案,使用电话的方式反馈给客户端. 以上有三个主体:客户端.服务端和接口(方式). 接口回调的原理框图说明: Demo界面

一张图掌握移动Web前端所有技术(大前端、工程化、预编译、自动化)

你要的移动web前端都在这里! 大前端方向:移动Web前端.Native客户端.Node.js. 大前端框架:React.Vue.js.Koa 跨终端技术:HTML 5.CSS 3.JavaScript 跨平台框架:React Native.Cordova 前端工程化:Grunt.Gulp.Webpack 前端预编译:Babel.Sass.Less 自动化测试:Jasmine.Mocha.Karma 一图在手,应有尽有! 更多信息参考:https://item.jd.com/12170351.h

微软要做用云量挖掘机,以技术驱动数字化转型快公司

今年7月,首次更名为"Inspire"的微软WPC全球合作伙伴大会上,微软宣布将所有与合作伙伴相关的角色都重新整合为一个新的部门:统一商业合作伙伴部门(One Commercial Partner),并进行了一整套的组织和流程改组,以适应云计算时代的用户需求与"用云量"规律. 2017年9月12日,微软大中华区副总裁.全球渠道事业部总经理.商业客户事业部总经理包嘉峰与媒体分享了这两个月微软商业合作伙伴部转型以来,微软自身所发生的变化以及为客户所带来的价值.根据包嘉峰

游戏服务器开发需要学习的技术

一,游戏服务器编程语言的选择 所谓的游戏服务器编程语言其实有很多,基本上任何一种语言都可以作为游戏服务器的编程语言.这需要根据自己游戏的类型和要求加以选择.比如C++,Java ,Erlang,go等等.目前我用过的只有C++和Java.但是以Java为主.所以接下来就以自己的经验,谈谈以Java为核心的游戏服务器开发技术体系. Java目前作为游戏服务器开发语言已经很是普遍.但是大多数是作为页游或手游的服务器,而端游戏一般选择C++,因为端游对服务器的性能要求相对比较高一些.两种语言各有利弊.

技术与技术人员的价值

在我工作到第四年出头时,总是感觉自己的价值被低估了,换个说法就是感觉工资低了. 当时,总觉得技术不如管理,但又听闻有人一直做技术也有到年薪百万的,虽不在身边,但江湖总有这样的传说.再环顾周身环境,似乎除了去伊拉克做技术能让收入立刻飙升(补贴超过工资),让我不禁怀疑江湖的百万传说是不是被高估了.再加上工作几年后,技术提升感觉明显变慢,第一个天花板已近在眼前. 分类 又过了七.八年后,再回顾走过的技术道路.感觉技术的价值有时被高估,有时又被低估,但长期看,很少有人能一直享受到高估的溢价,同样也很少有

会话技术Session&amp;Cookie

一.会话技术简介 1.存储客户端的状态 由一个问题引出今天的内容,例如网站的购物系统,用户将购买的商品信息存储到哪     里?因为Http协议是无状态的,也就是说每个客户访问服务器端资源时,服务器并不知道该客户端是谁,所以需要会话技术识别客户端的状态.会话技术是帮助服务器   记住客户端状态(区分客户端) 举例购物过程: 2.会话技术 从打开一个浏览器访问某个站点,到关闭这个浏览器的整个过程,成为一次会话.会话技术就是记录这次会话中客户端的状态与数据的. 会话技术分为Cookie和Sessio