Spark Streaming中向flume拉取数据

在这里看到的解决方法

https://issues.apache.org/jira/browse/SPARK-1729

请是个人理解,有问题请大家留言。

其实本身flume是不支持像KAFKA一样的发布/订阅功能的,也就是说无法让spark去flume拉取数据,所以老外就想了个取巧的办法。

在flume中其实sinks是向channel主动拿数据的,那么就让就自定义sinks进行自监听,然后使sparkstreaming先和sinks连接在一起, 让streaming来决定是否拿数据及拿数据的频率, 那么这不就是实现了由streaming来向flume拿数据的需求了嘛?

你看,真是聪明人的作法,但我觉得吧,如果真的有发布/订阅的需求,其实还是上KAFKA吧…

最后,现在来说一下应该怎么去使用

首先,需要将以下代码编译成jar包,然后在flume中使用,代码转自这里 (如果发现需要依赖的工具类神马的,请在相同目录下的scala文件中找一找)

package org.apache.spark.streaming.flume.sink

import java.net.InetSocketAddress
import java.util.concurrent._

import org.apache.avro.ipc.NettyServer
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.flume.Context
import org.apache.flume.Sink.Status
import org.apache.flume.conf.{Configurable, ConfigurationException}
import org.apache.flume.sink.AbstractSink

/**
 * A sink that uses Avro RPC to run a server that can be polled by Spark‘s
 * FlumePollingInputDStream. This sink has the following configuration parameters:
 *
 * hostname - The hostname to bind to. Default: 0.0.0.0
 * port - The port to bind to. (No default - mandatory)
 * timeout - Time in seconds after which a transaction is rolled back,
 * if an ACK is not received from Spark within that time
 * threads - Number of threads to use to receive requests from Spark (Default: 10)
 *
 * This sink is unlike other Flume sinks in the sense that it does not push data,
 * instead the process method in this sink simply blocks the SinkRunner the first time it is
 * called. This sink starts up an Avro IPC server that uses the SparkFlumeProtocol.
 *
 * Each time a getEventBatch call comes, creates a transaction and reads events
 * from the channel. When enough events are read, the events are sent to the Spark receiver and
 * the thread itself is blocked and a reference to it saved off.
 *
 * When the ack for that batch is received,
 * the thread which created the transaction is is retrieved and it commits the transaction with the
 * channel from the same thread it was originally created in (since Flume transactions are
 * thread local). If a nack is received instead, the sink rolls back the transaction. If no ack
 * is received within the specified timeout, the transaction is rolled back too. If an ack comes
 * after that, it is simply ignored and the events get re-sent.
 *
 */

class SparkSink extends AbstractSink with Logging with Configurable {

  // Size of the pool to use for holding transaction processors.
  private var poolSize: Integer = SparkSinkConfig.DEFAULT_THREADS

  // Timeout for each transaction. If spark does not respond in this much time,
  // rollback the transaction
  private var transactionTimeout = SparkSinkConfig.DEFAULT_TRANSACTION_TIMEOUT

  // Address info to bind on
  private var hostname: String = SparkSinkConfig.DEFAULT_HOSTNAME
  private var port: Int = 0

  private var backOffInterval: Int = 200

  // Handle to the server
  private var serverOpt: Option[NettyServer] = None

  // The handler that handles the callback from Avro
  private var handler: Option[SparkAvroCallbackHandler] = None

  // Latch that blocks off the Flume framework from wasting 1 thread.
  private val blockingLatch = new CountDownLatch(1)

  override def start() {
    logInfo("Starting Spark Sink: " + getName + " on port: " + port + " and interface: " +
      hostname + " with " + "pool size: " + poolSize + " and transaction timeout: " +
      transactionTimeout + ".")
    handler = Option(new SparkAvroCallbackHandler(poolSize, getChannel, transactionTimeout,
      backOffInterval))
    val responder = new SpecificResponder(classOf[SparkFlumeProtocol], handler.get)
    // Using the constructor that takes specific thread-pools requires bringing in netty
    // dependencies which are being excluded in the build. In practice,
    // Netty dependencies are already available on the JVM as Flume would have pulled them in.
    serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
    serverOpt.foreach(server => {
      logInfo("Starting Avro server for sink: " + getName)
      server.start()
    })
    super.start()
  }

  override def stop() {
    logInfo("Stopping Spark Sink: " + getName)
    handler.foreach(callbackHandler => {
      callbackHandler.shutdown()
    })
    serverOpt.foreach(server => {
      logInfo("Stopping Avro Server for sink: " + getName)
      server.close()
      server.join()
    })
    blockingLatch.countDown()
    super.stop()
  }

  override def configure(ctx: Context) {
    import SparkSinkConfig._
    hostname = ctx.getString(CONF_HOSTNAME, DEFAULT_HOSTNAME)
    port = Option(ctx.getInteger(CONF_PORT)).
      getOrElse(throw new ConfigurationException("The port to bind to must be specified"))
    poolSize = ctx.getInteger(THREADS, DEFAULT_THREADS)
    transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT, DEFAULT_TRANSACTION_TIMEOUT)
    backOffInterval = ctx.getInteger(CONF_BACKOFF_INTERVAL, DEFAULT_BACKOFF_INTERVAL)
    logInfo("Configured Spark Sink with hostname: " + hostname + ", port: " + port + ", " +
      "poolSize: " + poolSize + ", transactionTimeout: " + transactionTimeout + ", " +
      "backoffInterval: " + backOffInterval)
  }

  override def process(): Status = {
    // This method is called in a loop by the Flume framework - block it until the sink is
    // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
    // being shut down.
    logInfo("Blocking Sink Runner, sink will continue to run..")
    blockingLatch.await()
    Status.BACKOFF
  }

  private[flume] def getPort(): Int = {
    serverOpt
      .map(_.getPort)
      .getOrElse(
        throw new RuntimeException("Server was not started!")
      )
  }

  /**
   * Pass in a [[CountDownLatch]] for testing purposes. This batch is counted down when each
   * batch is received. The test can simply call await on this latch till the expected number of
   * batches are received.
   * @param latch
   */
  private[flume] def countdownWhenBatchReceived(latch: CountDownLatch) {
    handler.foreach(_.countDownWhenBatchAcked(latch))
  }
}

/**
 * Configuration parameters and their defaults.
 */
private[flume]
object SparkSinkConfig {
  val THREADS = "threads"
  val DEFAULT_THREADS = 10

  val CONF_TRANSACTION_TIMEOUT = "timeout"
  val DEFAULT_TRANSACTION_TIMEOUT = 60

  val CONF_HOSTNAME = "hostname"
  val DEFAULT_HOSTNAME = "0.0.0.0"

  val CONF_PORT = "port"

  val CONF_BACKOFF_INTERVAL = "backoffInterval"
  val DEFAULT_BACKOFF_INTERVAL = 200
}

  

然后在你的streaming中使用如下的代码

package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import java.net.InetSocketAddress

/**
 *  Produces a count of events received from Flume.
 *
 *  This should be used in conjunction with the Spark Sink running in a Flume agent. See
 *  the Spark Streaming programming guide for more details.
 *
 *  Usage: FlumePollingEventCount <host> <port>
 *    `host` is the host on which the Spark Sink is running.
 *    `port` is the port at which the Spark Sink is listening.
 *
 *  To run this example:
 *    `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
 */
object FlumePollingEventCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumePollingEventCount <host> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(host, IntParam(port)) = args

    val batchInterval = Milliseconds(2000)

    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)

    // Create a flume stream that polls the Spark Sink running in a Flume agent
    val stream = FlumeUtils.createPollingStream(ssc, host, port)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

  

时间: 2024-10-02 05:41:46

Spark Streaming中向flume拉取数据的相关文章

Spark Streaming中的操作函数分析

根据Spark官方文档中的描述,在Spark Streaming应用中,一个DStream对象可以调用多种操作,主要分为以下几类 Transformations Window Operations Join Operations Output Operations 一.Transformations 1.map(func) map操作需要传入一个函数当做参数,具体调用形式为 val b = a.map(func) 主要作用是,对DStream对象a,将func函数作用到a中的每一个元素上并生成新

使用Apache Flume抓取数据(1)

使用Apache Flume抓取数据,怎么来抓取呢?不过,在了解这个问题之前,我们必须明确ApacheFlume是什么? 一.什么是Apache Flume Apache Flume是用于数据采集的高性能系统 ,名字来源于原始的近乎实时的日志数据采集工具,现在广泛用于任何流事件数据的采集,支持从很多数据源聚合数据到HDFS. 最初由Cloudera开发 ,在2011年贡献给了Apache基金会 ,在2012年变成了Apache的顶级项目,Flume OG升级换代成了Flume NG. Flume

网页滚动到底部,拉取数据

网页滚动模式 //滚到到底部自动拉取数据 //页面滚到底部异步加载下一页数据 $(window).scroll(function () { //已经滚动到上面的页面高度 var scrollTop = parseFloat($(this).scrollTop()), //页面高度 scrollHeight = $(document).height(), //浏览器窗口高度 windowHeight = parseFloat($(this).height()), totalHeight = scr

第10课:Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

上一课我们讲解了Receiver启动的流程.Receiver是通过ReceiverSupervisor的start方法启动的: /** Start the supervisor */ def start() {   onStart()   startReceiver() } 首先会调用ReceiverSupervisor的onStart()方法, override protected def onStart() {   registeredBlockGenerators.foreach { _.

Spark Streaming发行版笔记16:数据清理内幕彻底解密

本讲从二个方面阐述: 数据清理原因和现象 数据清理代码解析 Spark Core从技术研究的角度讲 对Spark Streaming研究的彻底,没有你搞不定的Spark应用程序. Spark Streaming一直在运行,不断计算,每一秒中在不断运行都会产生大量的累加器.广播变量,所以需要对对象及 元数据需要定期清理.每个batch duration运行时不断触发job后需要清理rdd和元数据.Clinet模式 可以看到打印的日志,从文件日志也可以看到清理日志内容. 现在要看其背后的事情: Sp

Spark Streaming源码解读之流数据不断接收全生命周期彻底研究和思考

本期内容 : 数据接收架构设计模式 数据接收源码彻底研究 一.Spark Streaming数据接收设计模式   Spark Streaming接收数据也相似MVC架构: 1. Mode相当于Receiver存储数据,C级别的,Receiver是个抽象因为他有好多的Receiver 2. ReceiverSupervisor 是控制器,因为Receiver启动是靠ReceiverSuperior启动的,及接收到的数据交给ReceiverSuperior存储数据的 3. Driver会获得源数据,

Spark发行版笔记10:Spark Streaming源码解读之流数据不断接收和全生命周期彻底研究和思考

本节的主要内容: 一.数据接受架构和设计模式 二.接受数据的源码解读 Spark Streaming不断持续的接收数据,具有Receiver的Spark 应用程序的考虑. Receiver和Driver在不同进程,Receiver接收数据后要不断给Deriver汇报. 因为Driver负责调度,Receiver接收的数据如果不汇报给Deriver,Deriver调度时不会把接收的数据计算入调度系统中(如:数据ID,Block分片). 思考Spark Streaming接收数据: 不断有循环器接收

Spark Streaming中动态Batch Size深入及RateController解析

本期内容 : BatchDuration与 Process Time 动态Batch Size Spark Streaming中有很多算子,是否每一个算子都是预期中的类似线性规律的时间消耗呢? 例如:join操作和普通Map操作的处理数据的时间消耗是否会呈现出一致的线性规律呢,也就是说,并非数据量规模越大就是简单加大BatchDuration 就可以解决问题的,数据量是一个方面,计算的算子也是一个考量的因素. 使用BatchSize来适配我们的流处理程序 : 线上的处理程序越来越重要,流入的数据

Spark Streaming中的基本操作函数实例

官网文档中,大概可分为这几个 TransformationsWindow OperationsJoin OperationsOutput Operations 请了解一些基本信息: DStream是Spark Streaming提供的基本抽象.它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流.在内部,DStream由一系列连续的RDD表示,这是Spark对不可变分布式数据集的抽象.DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示 Tra