InputDStream的继承关系。他们都是使用InputDStream这个抽象类的接口进行操作的。特别注意ReceiverInputDStream这个类,大部分时候我们使用的是它作为扩展的基类,因为它才能(更容易)使接收数据的工作分散到各个worker上执行,更符合分布式计算的理念。
所有的输入流都某个时间间隔将数据以block的形式保存到spark memory中,但以spark core不同的是,spark streaming默认是将对象序列化后保存到内存中。
/** * This is the abstract base class for all input streams. This class provides methods * start() and stop() which is called by Spark Streaming system to start and stop receiving data. * Input streams that can generate RDDs from new data by running a service/thread only on * the driver node (that is, without running a receiver on worker nodes), can be * implemented by directly inheriting this InputDStream. For example, * FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for * new files and generates RDDs with the new files. For implementing input streams * that requires running a receiver on the worker nodes, use * [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class. * * @param ssc_ Streaming context that will execute this input stream */abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) { private[streaming] var lastValidTime: Time = null ssc.graph.addInputStream(this)
/** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] * that has to start a receiver on worker nodes to receive external data. * Specific implementations of NetworkInputDStream must * define `the getReceiver()` function that gets the receiver object of type * [[org.apache.spark.streaming.receiver.Receiver]] that will be sent * to the workers to receive data. * @param ssc_ Streaming context that will execute this input stream * @tparam T Class type of the object of this stream */abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { /** Keeps all received blocks information */ private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]] /** This is an unique identifier for the network input stream. */ val id = ssc.getNewReceiverStreamId()
/** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a NetworkInputDStream. */def getReceiver(): Receiver[T]
最终都是以BlockRDD返回的
/** Ask ReceiverInputTracker for received data blocks and generates RDDs with them. */override def compute(validTime: Time): Option[RDD[T]] = { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // master failure if (validTime >= graph.startTime) { val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id) receivedBlockInfo(validTime) = blockInfo val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) Some(new BlockRDD[T](ssc.sc, blockIds)) } else { Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) }}
时间: 2024-10-11 05:14:52