上一篇博客中,已经对股票预测的例子做了简单的讲解,下面对其中的几个关键的技术点再作一些总结。
1、updateStateByKey
由于在1.6版本中有一个替代函数,据说效率比较高,所以作者就顺便研究了一下该函数的用法。
def mapWithState[StateType, MappedType](spec :StateSpec[K, V, StateType, MappedType]) : MapWithStateDStream[K, V, StateType, MappedType] = { }
上面是函数的原型,接收一个StateSpec的对象,其实就是对updateStateByKey相关参数的一个封装。该对象接收4个类型参数,KEY值的类型,VALUE的类型,State的类型,Mapped的类型。理解这个四个类型参数也比较关键,这个跟updateStateByKey有少许区别:K,V这两个类型参数不需要太多解释;State的类型可以是任意类型,Float,(Float,Int),OneObject等等;MappedType是映射结果的类型,也就是说返回的类型也可以是任意类型,这点与updateStateByKey有少许不同。下面是一个示例
/** mapWithState.function是用每个key的state对(k,v)进行map * 对输入的每一个(stockMame,stockPrice)键值对,使用每个key的state进行映射,返回新的结果 * 此处的state是每个stockName的上一次的价格 * 用输入的(stockName,stockPrice)中的stockPrice更行state中的上一次的价格(state.update函数) * 映射结果为(stockName,(stockPrice-上一次价格,1)) ,当然映射结果也可以是其他值,例如(stockName,最后一次价格变化的方向) * */ val updatePriceTrend = (key:String, newPrice: Option[Float],state:State[Float]) => { val lstPrice:Float = state.getOption().getOrElse(newPrice.getOrElse(0.0f)) state.update(newPrice.getOrElse(0.0f)) // println(new SimpleDateFormat("HH:mm:ss").format(new Date())+"-"+newPrice.getOrElse(0.0f)+","+lstPrice) (key,(newPrice.getOrElse(0.0f)-lstPrice,1)) }
2、reduceByKeyAndWindow
上一个例子中,虽然使用到了该函数,但其实是在官方例子的基础上依葫芦画瓢写的,并不能很好的理解该函数的具体用法。下面是此次优化后的代码
val reduceFunc = (reduced: (Float,Int), newPair: (Float,Int)) => { if (newPair._1 > 0) (reduced._1 + newPair._1, reduced._2 + newPair._2) else (reduced._1 + newPair._1, reduced._2 - newPair._2) } val invReduceFunc = (reduced: (Float,Int), oldPair: (Float,Int)) => { if (oldPair._1 > 0) (reduced._1 + oldPair._1, reduced._2 - oldPair._2) else (reduced._1 + oldPair._1, reduced._2 + oldPair._2) } /** 每隔slideLen个BatchTime对过去windowLen个(不包含当前Batch)BatchTime的RDD进行计算 * */ val windowedPriceChanges = stockPrice.reduceByKeyAndWindow(reduceFunc,invReduceFunc, Seconds(3),//windowLen Seconds(1) //slideLen )
其中两个函数很关键:reduceFunc、invReduceFunc。reduceFunc是对进入窗口的数据进行的计算,invReduceFunc是对离开窗口的数据进行的计算。那么怎么理解进入窗口和离开窗口呢?要首先理解窗口函数的基本意义,下图展示了滑动窗口的概念 。
如上图所示,一个滑动窗口时间段((sliding window length)内的所有RDD会进行合并以创建windowed DStream所对应的RDDD。每个窗口操作有两个参数:
- window length - The duration of the window (3 in the figure),滑动窗口的时间跨度,指本次window操作所包含的过去的时间间隔(图中包含3个batch interval,可以理解时间单位)
- sliding interval - The interval at which the window operation is performed (2 in the figure).(窗口操作执行的频率,即每隔多少时间计算一次)
These two parameters must be multiples of the batch interval of the source DStream (1 in the figure). 这表示,sliding window length的时间长度以及sliding interval都要是batch interval的整数倍。batch interval是在构造StreamingContext时传入的(1 in the figure)
那么上图中,在time5的时候,reduceFunc处理的数据就是time4和time5;invReduceFunc处理的数据就是time1和time2。此处需要特别特别特别处理,这里的window at time 5要理解成time 5的最后一刻,如果这里的time是一秒的话,那么time 5其实就是第5秒最后一刻,也就是第6秒初。关于这一点在后面的博文中会具体讲解。
关键点解释的差不多了,reduceFunc的函数就好理解了,该函数的第一个参数reduced可以理解成在time 3的时候计算的最终结果,第二个参数其实也就分别是time 4和time 5的数据(该函数应该会被调用多次的);那么time 4和time 5的这两批数据是怎么汇总的呢?仍然是调用reduceFunc,也即是对这两批数据中的每一条具体的记录按照时间的先后顺序调用reduceFunc,其实也就是leftReduce。invReduceFunc同理。
好了,两个关键函数就算解释清楚了,如果还有不清楚的地方,可以留言评论,最后附上源码的git地址:http://git.oschina.net/gabry_wu/BigDataPractice