基于Spark Streaming预测股票走势的例子(二)

上一篇博客中,已经对股票预测的例子做了简单的讲解,下面对其中的几个关键的技术点再作一些总结。

1、updateStateByKey

  由于在1.6版本中有一个替代函数,据说效率比较高,所以作者就顺便研究了一下该函数的用法。

def mapWithState[StateType, MappedType](spec :StateSpec[K, V, StateType, MappedType]) : MapWithStateDStream[K, V, StateType, MappedType] = { }

上面是函数的原型,接收一个StateSpec的对象,其实就是对updateStateByKey相关参数的一个封装。该对象接收4个类型参数,KEY值的类型,VALUE的类型,State的类型,Mapped的类型。理解这个四个类型参数也比较关键,这个跟updateStateByKey有少许区别:K,V这两个类型参数不需要太多解释;State的类型可以是任意类型,Float,(Float,Int),OneObject等等;MappedType是映射结果的类型,也就是说返回的类型也可以是任意类型,这点与updateStateByKey有少许不同。下面是一个示例

   /** mapWithState.function是用每个key的state对(k,v)进行map
     * 对输入的每一个(stockMame,stockPrice)键值对,使用每个key的state进行映射,返回新的结果
     * 此处的state是每个stockName的上一次的价格
     * 用输入的(stockName,stockPrice)中的stockPrice更行state中的上一次的价格(state.update函数)
     * 映射结果为(stockName,(stockPrice-上一次价格,1)) ,当然映射结果也可以是其他值,例如(stockName,最后一次价格变化的方向)
     * */
    val updatePriceTrend = (key:String, newPrice: Option[Float],state:State[Float]) => {
     val lstPrice:Float = state.getOption().getOrElse(newPrice.getOrElse(0.0f))
     state.update(newPrice.getOrElse(0.0f))
    // println(new SimpleDateFormat("HH:mm:ss").format(new Date())+"-"+newPrice.getOrElse(0.0f)+","+lstPrice)
     (key,(newPrice.getOrElse(0.0f)-lstPrice,1))
    }

2、reduceByKeyAndWindow

上一个例子中,虽然使用到了该函数,但其实是在官方例子的基础上依葫芦画瓢写的,并不能很好的理解该函数的具体用法。下面是此次优化后的代码

    val reduceFunc = (reduced: (Float,Int), newPair: (Float,Int)) => {
      if (newPair._1 > 0) (reduced._1 + newPair._1, reduced._2 + newPair._2)
      else (reduced._1 + newPair._1, reduced._2 - newPair._2)
    }
    val invReduceFunc = (reduced: (Float,Int), oldPair: (Float,Int)) => {
      if (oldPair._1 > 0) (reduced._1 + oldPair._1, reduced._2 - oldPair._2)
      else (reduced._1 + oldPair._1, reduced._2 + oldPair._2)
    }
    /**  每隔slideLen个BatchTime对过去windowLen个(不包含当前Batch)BatchTime的RDD进行计算
      * */
    val windowedPriceChanges = stockPrice.reduceByKeyAndWindow(reduceFunc,invReduceFunc,
      Seconds(3),//windowLen
      Seconds(1) //slideLen
    )

其中两个函数很关键:reduceFunc、invReduceFunc。reduceFunc是对进入窗口的数据进行的计算,invReduceFunc是对离开窗口的数据进行的计算。那么怎么理解进入窗口和离开窗口呢?要首先理解窗口函数的基本意义,下图展示了滑动窗口的概念  。

如上图所示,一个滑动窗口时间段((sliding window length)内的所有RDD会进行合并以创建windowed DStream所对应的RDDD。每个窗口操作有两个参数:

  • window length - The duration of the window (3 in the figure),滑动窗口的时间跨度,指本次window操作所包含的过去的时间间隔(图中包含3个batch interval,可以理解时间单位)
  • sliding interval - The interval at which the window operation is performed (2 in the figure).(窗口操作执行的频率,即每隔多少时间计算一次)

These two parameters must be multiples of the batch interval of the source DStream (1 in the figure). 这表示,sliding window length的时间长度以及sliding interval都要是batch interval的整数倍。batch interval是在构造StreamingContext时传入的(1 in the figure)

那么上图中,在time5的时候,reduceFunc处理的数据就是time4和time5;invReduceFunc处理的数据就是time1和time2。此处需要特别特别特别处理,这里的window at time 5要理解成time 5的最后一刻,如果这里的time是一秒的话,那么time 5其实就是第5秒最后一刻,也就是第6秒初。关于这一点在后面的博文中会具体讲解。

关键点解释的差不多了,reduceFunc的函数就好理解了,该函数的第一个参数reduced可以理解成在time 3的时候计算的最终结果,第二个参数其实也就分别是time 4和time 5的数据(该函数应该会被调用多次的);那么time 4和time 5的这两批数据是怎么汇总的呢?仍然是调用reduceFunc,也即是对这两批数据中的每一条具体的记录按照时间的先后顺序调用reduceFunc,其实也就是leftReduce。invReduceFunc同理。

好了,两个关键函数就算解释清楚了,如果还有不清楚的地方,可以留言评论,最后附上源码的git地址:http://git.oschina.net/gabry_wu/BigDataPractice

时间: 2024-10-06 08:15:42

基于Spark Streaming预测股票走势的例子(二)的相关文章

实测 《Tensorflow实例:利用LSTM预测股票每日最高价(二)》的结果

近期股市行情牛转熊,大盘一直下探!由3200跌到了2700,想必很多人被深套了.这时想起人工智能能否预测股市趋势?RNN能否起作用? 这时便从网上找下教程,发现网上有个例子,便拿来一试!Tensorflow实例:利用LSTM预测股票每日最高价(二) 这个实例大体上没有大问题,只是有些小细节有问题!要想直接复制运行,是会报错的.首先整下代码过程原作者已经写得很清楚了,但对于初学者来讲,有些地方还是不太明白的.我作为一个初学者,先来谈谈我在整个实测中的收获吧! 实例的思路:通过tushare可以获取

通过机器学习的线性回归算法预测股票走势(用Python实现)

在本人的新书里,将通过股票案例讲述Python知识点,让大家在学习Python的同时还能掌握相关的股票知识,所谓一举两得.这里给出以线性回归算法预测股票的案例,以此讲述通过Python的sklearn库实现线性回归预测的技巧. 本文先讲以波士顿房价数据为例,讲述线性回归预测模型的搭建方式,随后将再这个基础上,讲述以线性预测模型预测股票的实现代码.本博文是从本人的新书里摘取的,新书预计今年年底前出版,敬请大家关注. 正文开始(长文预警) ------------------------------

【自动化】基于Spark streaming的SQL服务实时自动化运维

设计背景 spark thriftserver目前线上有10个实例,以往通过监控端口存活的方式很不准确,当出故障时进程不退出情况很多,而手动去查看日志再重启处理服务这个过程很低效,故设计利用Spark streaming去实时获取spark thriftserver的log,通过log判断服务是否停止服务,从而进行对应的自动重启处理,该方案能达到秒级 7 * 24h不间断监控及维护服务. 设计架构 在需要检测的spark thriftserver服务节点上部署flume agent来监控日志流

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十一)NIFI1.7.1安装

一.nifi基本配置 1. 修改各节点主机名,修改/etc/hosts文件内容. 192.168.0.120 master 192.168.0.121 slave1 192.168.0.122 slave2 具体请参考<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.> 2. 安装zookeeper分布式集群具体请参考<Kafka:ZK+Kafka+Spark Streaming集群

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

官网文档:<http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example> Spark Streaming官网的例子reduceByKeyAndWindow 简单的介绍了spark streaming接收socket流的数据,并把接收到的数据进行windows窗口函数对数据进行批量处理. import java.util.Arrays; import org.apache.spark.S

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。

Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭建(一)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网.>配置好虚拟机,正在使用中,让它强制断电后,启动起来发现ip无法访问,而且重启网络失败: 执行:systemctl restart network.service 出现异常:Failed to start LSB: Br

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十五)Structured Streaming:同一个topic中包含一组数据的多个部分,按照key它们拼接为一条记录(以及遇到的问题)。

需求: 目前kafka的topic上有一批数据,这些数据被分配到9个不同的partition中(就是发布时key:{m1,m2,m3,m4...m9},value:{records items}),mx(m1,m2...m9)这些数据的唯一键值:int_id+start_time,其中int_id和start_time是topic record中的记录.这9组数据按照唯一键值可以拼接(m1.primarykey1,m2.primarykey1,m3.primarykey1.....m9.prim

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十四)Structured Streaming:Encoder

一般情况下我们在使用Dataset<Row>进行groupByKey时,你会发现这个方法最后一个参数需要一个encoder,那么这些encoder如何定义呢? 一般数据类型 static Encoder<byte[]> BINARY() An encoder for arrays of bytes. static Encoder<Boolean> BOOLEAN() An encoder for nullable boolean type. static Encoder

Spark Streaming编程指南

本文基于Spark Streaming Programming Guide原文翻译, 加上一些自己的理解和小实验的结果. 一.概述 Spark Streaming是基于Core Spark API的可扩展,高吞吐量,并具有容错能力的用于处理实时数据流的一个组件.Spark Streaming可以接收各种数据源传递来的数据,比如Kafka, Flume, Kinesis或者TCP等,对接收到的数据还可以使用一些用高阶函数(比如map, reduce, join及window)进行封装的复杂算法做进