开发系列:03、Spark Streaming Custom Receivers(译)

Spark Streaming can receive streaming data from any arbitrary data source beyond the one’s for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.). This requires the developer to implement a receiver that is customized for receiving data from the concerned data source. This guide walks through the process of implementing a custom receiver and using it in a Spark Streaming application.

Spark Streaming 可以从任意的数据源收集到流式数据,而不仅是内置支持(即Flume、Kafka、files、Socket等)这需要开发人员实现自定义的从有关的数据源接收数据的接收器。本指南贯穿了实现自定义的接收器并在Spark 应用中使用它的过程。

Implementing a Custom Receiver

This starts with implementing a Receiver. A custom receiver must extend this abstract class by implementing two methods - onStart(): Things to do to start receiving data. - onStop(): Things to do to stop receiving data.

实现一个自定义的接收器

这从实现一个Receiver类开始。一个自定义的接收器必须继承这个抽象类并实现两个方法:onStart():一些开始接收数据的操作;onStop():一些停止接收数据的操作。

Note that onStart() and onStop() must not block indefinitely. Typically, onStart() would start the threads that responsible for receiving the data and onStop() would ensure that the receiving by those threads are stopped. The receiving threads can also use isStopped(), a Receiver method, to check whether they should stop receiving data.

注意onStart()和onStop()不能无限期的阻塞。通常,onStart()将会启动一个线程负责接收数据;onStop()将保证这些线程都停止。接收线程也可以使用isStopped(),Receiver类的一个方法,检测它们是否应该停止接收数据。

Once the data is received, that data can be stored inside Spark by calling store(data), which is a method provided by the Receiver class. There are number of flavours of store() which allow you store the received data record-at-a-time or as whole collection of objects / serialized bytes.

一旦接收到了数据,可以通过调用Spark里边的名叫store(data)的方法来保存数据,这个方法由Receiver类提供。store()方法有一个泛型的参数,它允许每次存储包含对象/序列化字节数组的容器的一条数据。

Any exception in the receiving threads should be caught and handled properly to avoid silent failures of the receiver. restart(<exception>) will restart the receiver by asynchronously calling onStop() and then calling onStart() after a delay. stop(<exception>) will call onStop() and terminate the receiver. Also, reportError(<error>) reports a error message to the driver (visible in the logs and UI) without stopping / restarting the receiver.

接收线程中的任何异常都应该被正确的处理,避免接收器无声故障。restart(< exception>)将重启接收器通过异步调用onStop()方法并在一定延迟之后调用onStart()。stop(< exception>)方法将调用onStop()并终止接收器。同样,reportError(<error>)方法报告一个错误信息到driver(可以在LOG和UI中看到)但不停止或重启接收器

The following is a custom receiver that receives a stream of text over a socket. It treats ‘\n’ delimited lines in the text stream as records and stores them with Spark. If the receiving thread has any error connecting or receiving, the receiver is restarted to make another attempt to connect.

下面是一个通过socket接收文本的自定义接收器。它把文本流中用‘\n‘分隔的行当作记录并存储到Spark。如果接收过程有错误的连接或接收错误,接收器会尝试另一次连接。

Scala

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
   // There is nothing much to do as the thread calling receive()
   // is designed to stop by itself isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
     // Connect to host:port
     socket = new Socket(host, port)

     // Until stopped or connection broken continue reading
     val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
     userInput = reader.readLine()
     while(!isStopped && userInput != null) {
       store(userInput)
       userInput = reader.readLine()
     }
     reader.close()
     socket.close()

     // Restart in an attempt to connect again when server is active again
     restart("Trying to connect again")
    } catch {
     case e: java.net.ConnectException =>
       // restart if could not connect to server
       restart("Error connecting to " + host + ":" + port, e)
     case t: Throwable =>
       // restart if there is any other error
       restart("Error receiving data", t)
    }
  }
}

Using the custom receiver in a Spark Streaming application

The custom receiver can be used in a Spark Streaming application by using streamingContext.receiverStream(<instance of custom receiver>). This will create input DStream using data received by the instance of custom receiver, as shown below

在Spark Streaming应用中使用自定义接收器

可以对它使用streamingContext.receiverStream(<自定义接收器实例>)的方式使用自定义接收器。这将会使用自定义的接收器接收的数据创建输入DStream,如下面展示

Scala

// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
...

The full source code is in the example CustomReceiver.scala.

这个例子完整的源码在CustomReceiver.scala.

Implementing and Using a Custom Actor-based Receiver

Custom Akka Actors can also be used to receive data. The ActorHelper trait can be applied on any Akka actor, which allows received data to be stored in Spark using store(...) methods. The supervisor strategy of this actor can be configured to handle failures, etc.

实现并使用基于Actor的自定义接收器

也可以自定义Akka的Actor来接收数据。ActorHelper trait可以应用到所有Akka actor接收到的数据使用store()存储到spark。这个actor的处理故障可以配置相应策略

class CustomActor extends Actor with ActorHelper {
  def receive = {
   case data: String => store(data)
  }
}

And a new input stream can be created with this custom actor as

可以使用这个自定义actor来创建新的输入流

// Assuming ssc is the StreamingContext
val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")

See ActorWordCount.scala for an end-to-end example.

时间: 2024-11-05 10:23:18

开发系列:03、Spark Streaming Custom Receivers(译)的相关文章

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处理

Spark入门实战系列--7.Spark Streaming(下)--实时流计算Spark Streaming实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.实例演示 1.1 流数据模拟器 1.1.1 流数据说明 在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境将定义流数据模拟器.该模拟器主要功能:通过Socket方式监听指定的端口号,当外部程序通过该端口连接并请求数据时,模拟器将定时将指定的文件数据随机获取发送给外部程序. 1.1.2 模拟器代码 import java.io.{PrintWriter} impor

Spark入门实战系列--7.Spark Streaming(下)--Spark Streaming实战

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 实例演示 1.1 流数据模拟器 1.1.1 流数据说明 在实例演示中模拟实际情况,需要源源不断地接入流数据,为了在演示过程中更接近真实环境将定义流数据模拟器.该模拟器主要功能:通过Socket方式监听指定的端口号,当外部程序通过该端口连接并请求数据时,模拟器将定时将指定的文件数据随机获取发送给外部程序. 1.1.2 模拟器代码 import java.io.{PrintWriter} import

微信开发系列-----03:实现各种消息的响应

继续之前我们没有完成的部分,现在可以实现简单的文本交互,这篇将实现各种消息,事件的响应. 项目GitHub地址:  https://github.com/Andyahui/xgyxsh_WeiXin SDK的GitHub地址:https://github.com/JeffreySu/WeiXinMPSDK/ SDK官方介绍博客:http://www.cnblogs.com/szw/archive/2013/05/14/weixin-course-index.html 一:实现各类消息的简单回复

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP

7.Spark Streaming(上)--Spark Streaming原理介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处

Spark1.1.0 Spark Streaming Programming Guide

Spark Streaming Programming Guide Overview A Quick Example Basic Concepts Linking Initializing StreamingContext Discretized Streams (DStreams) Input DStreams Transformations on DStreams Output Operations on DStreams Caching / Persistence Checkpointin

第4课:Spark Streaming的Exactly Once的事务处理

本期内容: Exactly once 输出不重复 Exactly once 1,事务一定会被处理,且只被处理一次: 2,输出能够输出且只会被输出. Receiver:数据通过BlockManager写入内存+磁盘或者通过WAL来保证数据的安全性. WAL机制:写数据时先通过WAL写入文件系统然后存储的Executor(存储在内存和磁盘中,由StorageLevel设定),假设前面没有写成功后面一定不会存储在Executor,如不存在Executor中的话,汇报Driver数据一定不被处理.WAL

Storm介绍及与Spark Streaming对比

1 Storm介绍 Storm是由Twitter开源的分布式.高容错的实时处理系统,它的出现令持续不断的流计算变得容易,弥补了Hadoop批处理所不能满足的实时要求.Storm常用于在实时分析.在线机器学习.持续计算.分布式远程调用和ETL等领域. 在Storm的集群里面有两种节点:控制节点(Master Node)和工作节点(Worker Node).控制节点上面运行一个名为Nimbus的进程,它用于资源分配和状态监控:每个工作节点上面运行一个Supervisor的进程,它会监听分配给它所在机