Spark Streaming can receive streaming data from any arbitrary data source beyond the one’s for which it has in-built support (that is, beyond Flume, Kafka, files, sockets, etc.). This requires the developer to implement a receiver that is customized for receiving data from the concerned data source. This guide walks through the process of implementing a custom receiver and using it in a Spark Streaming application.
Spark Streaming 可以从任意的数据源收集到流式数据,而不仅是内置支持(即Flume、Kafka、files、Socket等)这需要开发人员实现自定义的从有关的数据源接收数据的接收器。本指南贯穿了实现自定义的接收器并在Spark 应用中使用它的过程。
Implementing a Custom Receiver
This starts with implementing a Receiver. A custom receiver must extend this abstract class by implementing two methods - onStart()
: Things to do to start receiving data. - onStop()
: Things to do to stop receiving data.
实现一个自定义的接收器
这从实现一个Receiver类开始。一个自定义的接收器必须继承这个抽象类并实现两个方法:onStart():一些开始接收数据的操作;onStop():一些停止接收数据的操作。
Note that onStart()
and onStop()
must not block indefinitely. Typically, onStart() would start the threads that responsible for receiving the data and onStop()
would ensure that the receiving by those threads are stopped. The receiving threads can also use isStopped()
, a Receiver
method, to check whether they should stop receiving data.
注意onStart()和onStop()不能无限期的阻塞。通常,onStart()将会启动一个线程负责接收数据;onStop()将保证这些线程都停止。接收线程也可以使用isStopped(),Receiver类的一个方法,检测它们是否应该停止接收数据。
Once the data is received, that data can be stored inside Spark by calling store(data)
, which is a method provided by the Receiver class. There are number of flavours of store()
which allow you store the received data record-at-a-time or as whole collection of objects / serialized bytes.
一旦接收到了数据,可以通过调用Spark里边的名叫store(data)的方法来保存数据,这个方法由Receiver类提供。store()方法有一个泛型的参数,它允许每次存储包含对象/序列化字节数组的容器的一条数据。
Any exception in the receiving threads should be caught and handled properly to avoid silent failures of the receiver. restart(<exception>)
will restart the receiver by asynchronously calling onStop()
and then calling onStart()
after a delay. stop(<exception>)
will call onStop()
and terminate the receiver. Also, reportError(<error>)
reports a error message to the driver (visible in the logs and UI) without stopping / restarting the receiver.
接收线程中的任何异常都应该被正确的处理,避免接收器无声故障。restart(< exception>)将重启接收器通过异步调用onStop()方法并在一定延迟之后调用onStart()。stop(< exception>)方法将调用onStop()并终止接收器。同样,reportError(<error>)方法报告一个错误信息到driver(可以在LOG和UI中看到)但不停止或重启接收器
The following is a custom receiver that receives a stream of text over a socket. It treats ‘\n’ delimited lines in the text stream as records and stores them with Spark. If the receiving thread has any error connecting or receiving, the receiver is restarted to make another attempt to connect.
下面是一个通过socket接收文本的自定义接收器。它把文本流中用‘\n‘分隔的行当作记录并存储到Spark。如果接收过程有错误的连接或接收错误,接收器会尝试另一次连接。
Scala
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
Using the custom receiver in a Spark Streaming application
The custom receiver can be used in a Spark Streaming application by using streamingContext.receiverStream(<instance of custom receiver>)
. This will create input DStream using data received by the instance of custom receiver, as shown below
在Spark Streaming应用中使用自定义接收器
可以对它使用streamingContext.receiverStream(<自定义接收器实例>)的方式使用自定义接收器。这将会使用自定义的接收器接收的数据创建输入DStream,如下面展示
Scala
// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
...
The full source code is in the example CustomReceiver.scala.
这个例子完整的源码在CustomReceiver.scala.
Implementing and Using a Custom Actor-based Receiver
Custom Akka Actors can also be used to receive data. The ActorHelper
trait can be applied on any Akka actor, which allows received data to be stored in Spark using store(...)
methods. The supervisor strategy of this actor can be configured to handle failures, etc.
实现并使用基于Actor的自定义接收器
也可以自定义Akka的Actor来接收数据。ActorHelper
trait可以应用到所有Akka actor接收到的数据使用store()存储到spark。这个actor的处理故障可以配置相应策略
class CustomActor extends Actor with ActorHelper {
def receive = {
case data: String => store(data)
}
}
And a new input stream can be created with this custom actor as
可以使用这个自定义actor来创建新的输入流
// Assuming ssc is the StreamingContext
val lines = ssc.actorStream[String](Props(new CustomActor()), "CustomReceiver")
See ActorWordCount.scala for an end-to-end example.