自定义基于xmemcached协议消息队列的Spark Streaming 接收器

虽然spark streaming定义了常用的Receiver,但有时候还是需要自定义自己的Receiver的。对于自定义的Receiver,只需要实现spark streaming的Receiver抽象类即可。而Receiver的实现只需要简单地实现两个方法:

  1、onStart():接收数据。

  2、onStop():停止接收数据。

一般onStart()不应该阻塞,应该启动一个新的线程复杂数据接收。而onStop()方法负责确保这些接收数据的线程是停止的,在 Receiver 被关闭时调用了,可以做一些 close 工作。负责接收数据的线程可以通过isStopped()来判断是否要停止数据接收。

对于接收到的数据,需要存储到spark 框架里,使用的是store()方法。Receiver抽象类提供了4种store()方法,分别可用于存储:

  1、

单条小数据

2、数组形式的块数据

3、ByteBuffer 形式的块数据

4、iterator 形式的块数据

这4种的store()方法的实现都是直接将数据传递给 ReceiverSupervisor 来进行存储的。所以要自定义一个 Receiver,只要在 onStart() 里创建数据接收的线程,并在接收到数据时 store() 到 Spark Streamimg 框架就可以了。

下面代码是一个基于xmemcached协议消息队列的Spark Streaming 接收器:

    

import Fqueue.FqueueReceiver
import org.apache.spark.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver

class FqueueStreamingReceiver(val address: String, val connectionPoolSize: Int, val timeOut: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
  private var receiver: Option[FqueueReceiver] = None

  def onStart() {
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop(): Unit = {
    receiver foreach { _.stop() }
  }

  private def receive(): Unit = {
    val fqueueReceiver = new FqueueReceiver(address, connectionPoolSize, timeOut)
    receiver = Some(fqueueReceiver)
    receiver foreach { _.connect() }

    try
    {
      var stop = false
      while (!isStopped() && !stop) {
        val data = fqueueReceiver.deQueue("track_BOdao2015*")
        data match {
          case Some(str) => store(str)
          case None => Thread.sleep(1000)//stop = true
        }
      }
      receiver foreach { _.stop() }
    } catch {
      case e: Exception =>
        println("get data from fqueue err! pleace sure the server is live")
        println(e.getMessage)
        println(e.getStackTraceString)
        receiver foreach { _.stop() }
    }
  }
}

在自定义了spark streaming的Receiver后,可以在应用中使用:

def main(args: Array[String]) {
    new Thread("fqueue sender") {
      override def run() { sendData() }
    }.start()
    val config = new SparkConf().setAppName("testfqueue").setMaster("local[2]")
    val ssc = new StreamingContext(config, Seconds(5))
    val lines = ssc.receiverStream(new FqueueStreamingReceiver("localhost:18740", 4, 4000))
    lines.print()
    ssc.start()
    ssc.awaitTermination()
  }

 链接:

  http://spark.apache.org/docs/latest/streaming-custom-receivers.html

  基于xmemcached的消息队列

  本文的Receiver实现源码

  喜欢的话github项目上送个星星(⊙o⊙)哦........您的start是我的动力!

时间: 2024-08-02 06:58:52

自定义基于xmemcached协议消息队列的Spark Streaming 接收器的相关文章

HQueue:基于HBase的消息队列

HQueue:基于HBase的消息队列 凌柏 ?1. HQueue简介 HQueue是一淘搜索网页抓取离线系统团队基于HBase开发的一套分布式.持久化消息队列.它利用HTable存储消息数据,借助HBase Coprocessor将原始的KeyValue数据封装成消息数据格式进行存储,并基于HBase Client API封装了HQueue Client API用于消息存取. HQueue可以有效使用在需要存储时间序列数据.作为MapReduce Job和iStream等输入.输出供上下游共享

Spark定制班第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 1 在线动态计算分类最热门商品案例回顾与演示 我们用Spark Streaming+Spark SQL来实现分类最热门商品的在线动态计算.代码如下: package com.dt.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.sp

第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一.案例代码 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等 package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.

基于案例一节课贯通Spark Streaming流计算框架的运行源码

 在线动态计算分类最热门商品案例回顾与演示 基于案例贯通Spark Streaming的运行源码 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机. 是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark SQL作为查询引擎. 其中链接数据库代码如下: package com.dt.spark.com.dt.spark.streaming; import java.sql.Con

spark版本定制五:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例贯通Spark Streaming的运行源码 一.在线动态计算分类最热门商品案例回顾与演示 案例回顾: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPool import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.

版本定制第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例贯通Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPool import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.hive.HiveCo

【转】手把手教你实现自定义的应用层协议

原文: https://my.oschina.net/u/2245781/blog/1622414 报错了, 在Linux系统中,/usr/include/ 是C/C++等的头文件放置处, ------------------------------------------------------- 1.简述 互联网上充斥着各种各样的网络服务,在对外提供网络服务时,服务端和客户端需要遵循同一套数据通讯协议,才能正常的进行通讯:就好像你跟台湾人沟通用闽南语,跟广东人沟通就用粤语一样. 实现自己的应

消息总线VS消息队列

前段时间实现了一个基于RabbitMQ的消息总线,实现的过程中自己也在不断得思考.总结以及修正.需要考虑各个维度:效率.性能.网络.吞吐量.甚至需要自己去设想API可能的使用场景.模式.不过能有一件事情,自己愿意去做,在走路.吃饭.坐公交的时候都在思考如何去改进它,然后在实践的过程中,促使去思考并挖掘自己知识面的空白,也是一件让人开心的事情. 借此记录下自己在实现的过程中,以及平时的一些想法. 这是第一篇,先谈谈消息总线跟消息队列的区别,以及对于企业级应用需要将消息队列封装成消息总线的必要性.

消息队列一览

谈谈互联网后端基础设施 http://www.rowkey.me/blog/2016/08/27/server-basic-tech-stack/ service之间的调用方式可以分为同步调用以及异步调用.异步调用是怎么进行的?一种很常见的方式就是使用消息队列,调用方把请求放到队列中即可返回,然后等待服务提供方去队列中去获取请求进行处理,然后把结果返回给调用方即可(可以通过回调). 异步调用就是消息中间件一个非常常见的应用场景.此外,消息队列的应用场景还有以下: 解耦:一个事务,只关心核心的流程