大数据技术之_26_交通状态预测项目_01_数据模拟 + 数据采集 + 数据建模 + 数据预测 + 项目总结

一 项目背景二 项目架构与环境2.1 项目架构2.2 项目环境三 项目实现3.1 数据模拟3.1.1 数据结构3.1.2 编写代码3.1.3 测试3.2 数据采集3.2.1 编写代码3.2.2 测试3.3 数据建模3.3.1 编写代码3.3.2 测试3.4 数据预测3.4.1 编写代码3.4.2 测试四 项目总结



小段子分享:有个人姓铁,他不长头发,请问他得了什么病?答:老铁没毛病。

一 项目背景

该项目以车辆预测为基础,学习业务解决的方法论。
学习完本项目后,可以解决如下问题或适用于如下业务场景:
  1、公路堵车预测
  2、地铁人流量预测
  3、共享单车聚集点预测等等

扩展知识:

spark-shell --master yarn --conf spark.eventLog.dir=hdfs://hadoop/tmp/spark2 --jars /home/hadoop-SNAPSHOT.jar

智能判断:    轨迹:将某一辆车在所有监测点留下的踪迹聚合连线,则为该车的轨迹。    跟车:判断某一辆车是否被跟踪(此处“跟踪”为中性词汇)等。比如:婚车(判断是否属于一个车队)    碰撞:这里不是撞车分析,而是在几个电子围栏内(比如,监测点1,监测点2),同一辆车,在某一个时间范围内,检测出该车出现在不同的监测点。电子围栏:比如OA打卡。

技侦支持:    布控:警方输入布控的车辆信息(车牌号,车型,颜色等等)    实时报警:符合布控标准,则报警    套牌分析:相同车牌号,车辆信息不一致    落脚点:车辆在哪个区域停留时间长

统计分析:    流量统计:哪几个监测点的车流量比较高    外来车辆统计等等

数据结构示例:日期            关卡id       摄像id       车辆          发生时间          速度    公路id   区域id(维度=特征值向量)

2017-04-25      0001        09203       京W47147     2017-04-25 20:58:17     138     49      042017-04-25      0005        06975       京W47147     2017-04-25 20:12:39     50      10      062017-04-25      0001        02846       京W47147     2017-04-25 20:20:20     214     21      002017-04-25      0003        06044       京W47147     2017-04-25 20:15:58     78      47      012017-04-25      0000        01599       京W47147     2017-04-25 20:40:58     59      32      012017-04-25      0002        09260       京M91266     2017-04-25 09:09:57     105     15      00

一个 Event(事件)至少包含一行数据。因为 Kafka 是基于事件的。

为什么一个 Event(事件)包含多行数据?答:我们将多行数据封装(打包)成一个 Event,发送给 Kafka,这样的好处是减少网络IO。如何打包呢?答:使用 json 格式,如下:

{    "monitor_arr":    [        {            "time": "2017-04-25",            monitor_id:"0001",            ...        },        {            "time": "2017-04-25",            monitor_id:"0005",            ...        },        ......    ]}

有监督学习:有标签(label)的训练 --> 建模的过程 --> 求通项公式的过程 --> 求拟合函数的过程 --> 求参数的过程 --> 连续数据,常用算法:回归算法 --> 线性回归、逻辑斯特回归无监督学习:没有标签(label)的训练 --> 离散数据 --> 比如归类问题,常用算法:聚类算法 --> 支持向量机、随机森林(起源于决策树,万能药)、K-means 算法半监督学习:一部分有标签,一部分无标签。

老罗的锤子手机不赚钱,为了交个朋友--情怀,卖配件、T恤等赚钱。

平民化的最接近科学/科研 --> 计算机

维度认知:

二 项目架构与环境

2.1 项目架构

2.2 项目环境

以下环境为本次项目使用的情景,并非只有在此环境下才可以完成整体业务需求。请灵活变动。(本例已在以下环境中完成测试)

三 项目实现

我们新建 java 项目 tf,之后的每一个项目模块都建立于该项目之下。然后删除掉 src 目录。

3.1 数据模拟

  请确保 zookeeper 和 kafka 的正确配置。
  如果之前安装的 scala 版本不是 2.11.8 请替换之:

$ tar -zxf /opt/software/scala-2.11.8.tgz -C /opt/module/

使用 root 用户,配置环境变量:[[email protected] module]$ sudo vim /etc/profile

#SCALA_HOMEexport SCALA_HOME=/opt/module/scala/scala-2.11.8export PATH=$PATH:$SCALA_HOME/bin

使环境变量生效:[[email protected] module]$ sudo source /etc/profile

  我们需要产生一些监测点的模拟车速数据,并将这些数据实时的发送给 kafka,保存到 traffic 主题中,以供后续的 Spark 读取数据并加工之后存放于 redis。

3.1.1 数据结构

3.1.2 编写代码

思路:
  a) 新建模块 maven 工程:tf_producer
  b) 配置 maven 依赖。
  c) 因为要把数据发送给 kafka,所以配置 kafka 属性,保存于某个配置文件中。
  d) 编写 kafka 加载属性的工具类。
  e) 每隔 5 分钟,切换一次模拟状态,例如第一个五分钟,车速都在 30km/h 以上,下一个五分钟,车速都在 10km/h 以下,往复模拟公路一会堵车,一会不堵车的情况。
  f) 启动 zookeeper 集群和 kafka 集群,并创建 kafka 主题,检查主题存在性。
  g) 将数据发送至 kafka 并使用 kafka console-consumer 进行检测。

1) 新建项目:tf_producer

2) maven 的 pom.xml 文件配置:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>    <artifactId>tf_producer</artifactId>    <version>1.0-SNAPSHOT</version>

    <dependencies>        <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka-clients</artifactId>            <version>0.11.0.2</version>        </dependency>

        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>            <version>1.2.41</version>        </dependency>    </dependencies></project>

3) kafka 属性配置文件:kafka.properties(生产者)

# 设置 kafka 的 brokerlistbootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 生产者序列化key.serializer=org.apache.kafka.common.serialization.StringSerializervalue.serializer=org.apache.kafka.common.serialization.StringSerializer

acks=allretries=0

# 设置消费者所属的消费组group.id=g_traffic1

# 设置是否自动确认 offsetenable.auto.commit=true

# 设置自动确认 offset 的时间间隔auto.commit.interval.ms=30000

# 设置本次消费的主题kafka.topics=traffic

# 设置 zookeeper 中 follower 和 leader 之间的关于 kafka 的信息同步时间间隔zookeeper.sync.time.ms=250num.io.threads=12batch.size=65536buffer.memory=524288

# kafka 中消息保存的时间(单位是小时),企业开发中是 7 天log.retention.hours=2

4) 编写 kafka 属性加载工具类:PropertyUtil.scala

package com.atguigu.utils

import java.util.Properties

object PropertyUtil {  val properties = new Properties()  // 加载配置属性  try {    val inputStream = ClassLoader.getSystemResourceAsStream("kafka.properties")    properties.load(inputStream)  } catch {    case ex: Exception => println(ex)  } finally {

  }

  // 定义通过键得到属性值的方法  def getProperty(key: String): String = properties.getProperty(key)}

5) 开始模拟数据,每隔 5 分钟切换一次模拟状态,文件:Producer.scala

package com.atguigu.producer

import java.text.DecimalFormatimport java.util.Calendar

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}import com.atguigu.utils.PropertyUtil

import scala.util.Randomimport java._

import com.alibaba.fastjson.JSON

/**  * 模拟产生数据,同时把数据实时的发送到 kafka  * 随机产生 监测点id 以及 速度  * 序列化为 json  * 发送给 kafka  */object Producer {

  def main(args: Array[String]): Unit = {    // 读取配置文件信息    val properties = PropertyUtil.properties    // 创建 kafka 生产者对象    val producer = new KafkaProducer[String, String](properties)

    // 模拟产生实时数据,单位为:秒    var startTime = Calendar.getInstance().getTimeInMillis() / 1000

    // 数据模拟,堵车状态切换的周期单位为:秒    val trafficCycle = 300

    val df = new DecimalFormat("0000")    // 开始不停的实时产生数据    while (true) {      // 模拟产生监测点 id:1~20      val randomMonitorId = df.format(Random.nextInt(20) + 1)      // 模拟车速      var randomSpeed = "000"

      // 得到本条数据产生时的当前时间,单位为:秒      val currentTime = Calendar.getInstance().getTimeInMillis() / 1000      // 每 5 分钟切换一次公路状态      if (currentTime - startTime > trafficCycle) {        randomSpeed = new DecimalFormat("000").format(Random.nextInt(16))        if (currentTime - startTime > trafficCycle * 2) {          startTime = currentTime        }      } else {        randomSpeed = new DecimalFormat("000").format(Random.nextInt(31) + 30)      }

      // 该 Map 集合用于存放生产出来的数据      val jsonmMap = new util.HashMap[String, String]()      jsonmMap.put("monitor_id", randomMonitorId)      jsonmMap.put("speed", randomSpeed)

      // 因为 kafka 是基于事件的,在此,我们每一条产生的数据都序列化为一个 json 事件      val event = JSON.toJSON(jsonmMap)

      // 发送事件到 kafka 集群中      producer.send(new ProducerRecord[String, String](PropertyUtil.getProperty("kafka.topics"), event.toString))

      Thread.sleep(500)

      // 测试      // println("监测点id:" + randomMonitorId + "," + "车速:" + randomSpeed)      println(event)    }  }}


6) 启动集群中的其他相关节点(zookeeper,hadoop 等),启动 kafka,并创建 kafka 主题,检查主题存在性

[[email protected] ~]$ start-cluster.sh 

Linux 集群服务群起脚本
(1) 启动脚本:start-cluster.sh

#!/bin/bashecho "================        开始启动所有节点服务      ==========="echo "================        正在启动 Zookeeper      ==========="for i in [email protected] [email protected] [email protected]do    ssh $i ‘source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh start‘done

echo "================        正在启动 HDFS           ==========="ssh [email protected] ‘/opt/module/hadoop-2.7.2/sbin/start-dfs.sh‘

echo "================        正在启动 YARN           ==========="ssh [email protected] ‘/opt/module/hadoop-2.7.2/sbin/start-yarn.sh‘

echo "================    hadoop102 节点正在启动 JobHistoryServer   ==========="ssh [email protected] ‘/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh start historyserver‘

(2) 停止脚本:stop-cluster.sh

#!/bin/bashecho "================        开始停止所有节点服务      ==========="echo "================    hadoop102 节点正在停止 JobHistoryServer ==========="ssh [email protected] ‘/opt/module/hadoop-2.7.2/sbin/mr-jobhistory-daemon.sh stop historyserver‘

echo "================        正在停止 YARN           ==========="ssh [email protected] ‘/opt/module/hadoop-2.7.2/sbin/stop-yarn.sh‘

echo "================        正在停止 HDFS           ==========="ssh [email protected] ‘/opt/module/hadoop-2.7.2/sbin/stop-dfs.sh‘

echo "================        正在停止 Zookeeper      ==========="for i in [email protected] [email protected] [email protected]do    ssh $i ‘source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop‘done

(3) 查看进程脚本:util.sh

#!/bin/bashfor i in [email protected] [email protected] [email protected]do    echo "================      $i 的所有进程       ==========="    ssh $i ‘/opt/module/jdk1.8.0_144/bin/jps‘done

尖叫提示:脚本学会之后,如果后续再有新的节点需要添加到群起任务中,可以自行解决之。
尖叫提示:启动与停止注意脚本的执行顺序,而且停止脚本的停止过程应该是启动过程的倒序。



zookeeper 集群群起脚本:

[[email protected] ~]$ zkstart.sh

(1) 启动脚本:zkstart.sh

#!/bin/bashecho "==========  正在启动 zookeeper 集群  =========="for i in [email protected] [email protected] [email protected]do    ssh $i ‘source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh start‘done

(2) 停止脚本:zkstop.sh

#!/bin/bashecho "==========  正在停止 zookeeper 集群  =========="for i in [email protected] [email protected] [email protected]do    ssh $i ‘source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh stop‘done

(3) 状态脚本:zkstatus.sh

#!/bin/bashecho "==========  正在查看 zookeeper 集群状态  =========="for i in [email protected] [email protected] [email protected]do    ssh $i ‘source /etc/profile;/opt/module/zookeeper-3.4.10/bin/zkServer.sh status‘done


kafka 集群脚本:

[[email protected] ~]$ kafka-start.sh

(1) 启动脚本:kafka-start.sh

#!/bin/bashecho "================        正在启动 Kafka 集群       ==========="for i in [email protected] [email protected] [email protected]do    ssh $i ‘source /etc/profile;/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties‘done

(2) 停止脚本:kafka-stop.sh

#!/bin/bashecho "================        正在停止 Kafka 集群       ==========="for i in [email protected] [email protected] [email protected]do    ssh $i ‘source /etc/profile;/opt/module/kafka/bin/kafka-server-stop.sh -daemon‘done


创建 kafka 主题:traffic

[[email protected] kafka]$ bin/kafka-topics.sh --create --zookeeper hadoop102:2181 --replication-factor 1 --partitions 3 --topic traffic

删除 kafka 主题:traffic

[[email protected] kafka]$ bin/kafka-topics.sh --delete --zookeeper hadoop102:2181 --topic traffic

检查 kafka 的 traffic 主题是否正常:

[[email protected] kafka]$ bin/kafka-topics.sh --list --zookeeper hadoop102:2181

3.1.3 测试

将数据发送至 kafka 并使用 kafka console-consumer 进行检测,持续运行若干分钟后,查看数据是否稳定输入输出。
启动 kafka 控制台消费者:

// kafka-console-consumer[[email protected] kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic trafffic

kafka 控制台消费者消费数据如下图所示;

3.2 数据采集

  我们将实时模拟出来的数据,放置于 redis 中。

3.2.1 编写代码

思路:
  a) 新建工程:tf_consumer
  b) 配置 maven 依赖并添加 scala 框架的支持。
  c) 配置 redis 并测试。
  d) 将刚才 kafka.properties 以及 PropertyUtil 拷贝过来,并进行相应的修改。
  e) 编写 redis 操作工具类:RedisUtil
  f) 读取 kafka 中的数据,实时保存到 redis 中,并且按照分钟和监测点聚合车速和车辆个数。

1) 新建工程:tf_consumer

2) 配置 maven 的 pom.xml 文件以及 kafka.properties:
pom.xml:

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>    <artifactId>tf_consumer</artifactId>    <version>1.0-SNAPSHOT</version>

    <dependencies>        <dependency>            <groupId>org.apache.kafka</groupId>            <artifactId>kafka-clients</artifactId>            <version>0.11.0.2</version>        </dependency>

        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>            <version>1.2.41</version>        </dependency>

        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-core_2.11</artifactId>            <version>2.1.1</version>        </dependency>

        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-streaming_2.11</artifactId>            <version>2.1.1</version>        </dependency>

        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-streaming-kafka_2.11</artifactId>            <version>1.6.3</version>        </dependency>

        <dependency>            <groupId>redis.clients</groupId>            <artifactId>jedis</artifactId>            <version>2.9.0</version>        </dependency>    </dependencies></project>

3) 修改 kafka.properties 配置文件(消费者):

# 设置 kafka 的 brokerlistbootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 消费者反序列化key.deserializer=org.apache.kafka.common.serialization.StringDeSerializervalue.deserializer=org.apache.kafka.common.serialization.StrinDegSerializer

acks=allretries=0

# Kafka 老版本中的元数据服务列表metadata.broker.list=hadoop102:9092,hadoop103:9092,hadoop104:9092

# 设置消费者所属的消费组group.id=g_traffic1

# 设置是否自动确认 offsetenable.auto.commit=true

# 设置自动确认 offset 的时间间隔auto.commit.interval.ms=30000

# 设置本次消费的主题kafka.topics=traffic

# 设置 zookeeper 中 follower 和 leader 之间的关于 kafka 的信息同步时间间隔zookeeper.sync.time.ms=250num.io.threads=12batch.size=65536buffer.memory=524288

# kafka 中消息保存的时间(单位是小时),企业开发中是 7 天log.retention.hours=2

3) 配置 Redis(单节点)环境并测试

// 通过 wget 下载 Redis 的源码[[email protected] software]$ wget http://download.redis.io/releases/redis-4.0.2.tar.gz

// 将源代码解压到指定目录 /opt/module 下[[email protected] software]$ tar -zxf redis-4.0.2.tar.gz -C /opt/module

// 进入 Redis 源代码目录,编译安装(因为 redis 是用 C 语言写的)[[email protected] module]$ cd redis-4.0.2/

// 安装 GCC[[email protected] module]$ sudo yum install gcc

// 编译源代码[[email protected] redis-4.0.2]$ make MALLOC=libc

如果报错zmalloc.h:50:31: error: jemalloc/jemalloc.h: No such file or directoryzmalloc.h:55:2: error: #error "Newer version of jemalloc required"make[1]: *** [adlist.o] Error 1make[1]: Leaving directory `/opt/module/redis-4.0.2/src‘make: *** [all] Error 2解决办法是:make MALLOC=libc

注意:Redis 并没有自己实现内存池,没有在标准的系统内存分配器上再加上自己的东西。redis-2.4 以上自带 jemalloc,你不需要加任何参数,通过 zmalloc.c 源码中我们可以看到,Redis 在编译时,会先判断是否使用 tcmalloc,如果是,会用 tcmalloc 对应的函数替换掉标准的 libc 中的函数实现。其次会判断 jemalloc 是否使用,最后如果都没有使用才会用标准的 libc 中的内存管理函数。所以用 tcmalloc 优化请谨慎使用,这两个分配器碎片率相差不大,建议用自带 jemalloc。

如果要安装 tcmalloc 可以这样:make USE_TCMALLOC=yes

// 编译安装(注意:要使用 root 用户权限)[[email protected] redis-4.0.2]$ sudo make install

// 创建配置文件,放入指定的目录[[email protected] redis-4.0.2]$ sudo cp /opt/module/redis-4.0.2/redis.conf /opt/module/redis-4.0.2/myredis

// 修改配置文件中以下内容(注意 redis 新版的 4.x 与 老版本 3.x 上配置的细微差别)[[email protected] redis-4.0.2]$ sudo vim /opt/module/redis-4.0.2/myredis/redis.conf

bind 0.0.0.0                                            #69行       #绑定主机 IP,默认值为127.0.0.1,我们是跨机器运行,所以需要更改,表示任意机器集群均可访问,实际开发是中不建议这样改daemonize yes                                           #136行      #是否以后台 daemon 方式运行,默认不是后台运行pidfile /var/run/redis/redis_6379.pid                   #158行      #redis 的 PID 文件路径(可选)logfile "/opt/module/redis-4.0.2/myredis/redis.log"     #171行      #定义 log 文件位置,模式 log 信息定向到 stdout,输出到 /dev/null(可选)dir "/opt/module/redis-4.0.2/myredis"                   #263行      #本地数据库存放路径,默认为./(可选)

// 编译安装默认存在在 /usr/local/bin 目录下,如下[[email protected] redis-4.0.2]$ cd /usr/local/bin/[[email protected] bin]$ ll总用量 9572-rw-r--r-- 1 root root      83 5月   8 01:27 dump6379.rdb-rw-r--r-- 1 root root      83 5月   8 01:27 dump6380.rdb-rw-r--r-- 1 root root      83 5月   8 01:27 dump6381.rdblrwxrwxrwx 1 root root       6 4月  28 17:17 nc -> netcat-rwxr-xr-x 1 root root  103479 4月  28 17:17 netcat-rwxr-xr-x 1 root root  290454 5月  23 12:37 redis-benchmark-rwxr-xr-x 1 root root 2971304 5月  23 12:37 redis-check-aof-rwxr-xr-x 1 root root   45443 5月   6 17:27 redis-check-dump-rwxr-xr-x 1 root root 2971304 5月  23 12:37 redis-check-rdb-rwxr-xr-x 1 root root  419907 5月  23 12:37 redis-clilrwxrwxrwx 1 root root      12 5月  23 12:37 redis-sentinel -> redis-server-rwxr-xr-x 1 root root 2971304 5月  23 12:37 redis-server

在安装完 Redis 之后,启动 Redis

// 启动 Redis 服务器[[email protected] redis-4.0.2]$ redis-server /opt/module/redis-4.0.2/myredis/redis.conf

// 连接 Redis 服务器[[email protected] redis-4.0.2]$ redis-cli -h 192.168.25.102 -p 6379192.168.25.102:6379> set k1 123OK192.168.25.102:6379> get k1"123"192.168.25.102:6379> keys *1) "k1"2) "uid:2"192.168.25.102:6379> lrange uid:2 0 -1      #查看列表的某个范围的数据1) "150:5.0"2) "144:3.0"3) "110:4.0"192.168.25.102:6379> lpush uid:1 3671:3.0 2968:1.0 2455:2.5     #存一组列表数据192.168.25.102:6379> flushall       #清空所有数据192.168.25.102:6379> select 1       #选择数据库

// 查看 Redis 的启动情况[[email protected] redis-4.0.2]$ ps -ef | grep redisatguigu    6033      1  0 13:08 ?        00:00:00 redis-server 0.0.0.0:6379                              atguigu    6046   4336  0 13:12 pts/0    00:00:00 grep redis 

// 停止 Redis 服务器[[email protected] redis-4.0.2]$ redis-cli shutdown

4) 将刚才 kafka.properties 以及 PropertyUtil 拷贝过来,kafka.properties 需要进行相应的修改
5) 编写 redis 操作工具类:RedisUtil.scala

package com.atguigu.utils

import redis.clients.jedis._

// 代码写在半生对象中,这些代码会在类加载的时候,自动的进行初始化object RedisUtil {  // 配置 redis 基本连接参数  val host = "192.168.25.102"  val port = 6379  val timeout = 30000

  val config = new JedisPoolConfig

  // 设置连接池允许最大的连接个数  config.setMaxTotal(200)  // 设置最大空闲连接数  config.setMaxIdle(50)  // 设置最小空闲连接数  config.setMinIdle(8)

  // 设置连接时的最大等待的毫秒数  config.setMaxWaitMillis(10000)  // 设置在获取连接时,检查连接的有效性  config.setTestOnBorrow(true)  // 设置在释放连接时,检查连接的有效性  config.setTestOnReturn(true)

  // 设置在连接空闲时,检查连接的有效性  config.setTestWhileIdle(true)

  // 设置两次扫描之间的时间间隔毫秒数  config.setTimeBetweenEvictionRunsMillis(30000)  // 设置每次扫描的最多的对象数  config.setNumTestsPerEvictionRun(10)  // 设置逐出连接的最小时间间隔,默认是 1800000 毫秒 = 30 分钟  config.setMinEvictableIdleTimeMillis(60000)

  //  连接池  lazy val pool = new JedisPool(config, host, port, timeout)

  // 释放资源  lazy val hook = new Thread{ // 钩子函数:执行一些善后操作,正常退出    override def run() = {      pool.destroy()    }  }

  sys.addShutdownHook(hook.run())}

6) 在 SparkConsumer.scala 中读取 kafka 中的数据,实时保存到 redis 中,并且按照分钟和监测点聚合车速和车辆个数。用到 Spark Streaming 的时间窗口函数进行聚合。

package com.atguigu.consumer

import java.text.SimpleDateFormatimport java.util.Calendar

import com.alibaba.fastjson.{JSON, TypeReference}import com.atguigu.utils.{PropertyUtil, RedisUtil}import kafka.serializer.StringDecoderimport org.apache.spark.streaming.kafka.KafkaUtilsimport org.apache.spark.streaming.{Seconds, StreamingContext}import org.apache.spark.{SparkConf, SparkContext}

/**  * 堵车预测:处理实时数据,消费数据到 redis  */object SparkConsumer {  def main(args: Array[String]): Unit = {    // 初始化 Spark    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TrafficStreaming")    val sc = new SparkContext(sparkConf)    val ssc = new StreamingContext(sc, Seconds(5))

    // 设置检查点目录    ssc.checkpoint("./ssc/checkpoint")

    // 配置 kafka 参数,使用的是 spark 为我们封装的一套操作 kafka coonsumer 的工具包    val kafkaParam = Map("metadata.broker.list" -> PropertyUtil.getProperty("metadata.broker.list"))

    // 配置 kafka 主题    val topics = Set(PropertyUtil.getProperty("kafka.topics"))

    // 读取 kafka 主题 中的每一个事件 event    val kafkaLineDStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParam, topics)      .map(_._2) // 由于我们 event 中的键是 null,所以我们需要把值映射出来

    // 解析 json 字符串    val event = kafkaLineDStream.map(line => { // {"monitor_id":"0001","speed":"038"}      // 使用 fastjson 来解析当前事件中封装的数据信息,由于该 json 字符串不支持 Scala Map,所以需要先将 json 字符串解析为 Java Map      val lineJavaMap = JSON.parseObject(line, new TypeReference[java.util.Map[String, String]]() {})      // 将 Java Map 转换成 Scala Map      import scala.collection.JavaConverters._      val lineScalaMap: collection.mutable.Map[String, String] = mapAsScalaMapConverter(lineJavaMap).asScala      println(lineScalaMap) // Map[String, String] = ("monitor_id" -> "0001", "speed" -> "038")      lineScalaMap    })

    // 将每一条数据根据 monitor_id 聚合,聚合每一条数据中的 “车辆速度” 叠加    // 例如:聚合好的数据形式:(monitor_id, (speed, 1))  ("0001", (038, 1))    // 最終結果举例:("0001", (1365, 30))    val sumOfSpeedAndCount = event      .map(e => (e.get("monitor_id").get, e.get("speed").get)) // ("0001", "038")、("0001", "048")、("0002", "015")      .mapValues(s => (s.toInt, 1)) // ("0001", (038, 1))、("0001", (048, 1))、("0002", (015, 1))      .reduceByKeyAndWindow( // reduce 表示从左边开始执行将得到的结果返回给第一个参数      (t1: (Int, Int), t2: (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2),      Seconds(60), // 滑动窗口大小 60 秒,误差最大 59 秒,即上一分钟的数据当成下一分钟的数据来用了。      Seconds(60)) // 滑动步长 60 秒,对我们实际建模的影响忽略不计,因为:实际中,不可能1分钟内就造成大量拥堵,或者堵车不可能1分钟之内就缓解了!!!后面建模的时候会进行线性滤波。

    // 定义 redis 数据库中的数据库索引 index    val dbIndex = 1    // 将采集到的数据,按照每分钟放置于redis 中,将用于后边的数据建模    sumOfSpeedAndCount.foreachRDD(rdd => {      rdd        .foreachPartition(partitionRecords => {          partitionRecords            .filter((tuple: (String, (Int, Int))) => tuple._2._1 > 0) // 过滤掉元组数据中的速度小于0的数据            .foreach(pair => {            // 开始取出这 60 秒的 windows 中所有的聚合数据进行封装,准备存入 redis 数据库            val jedis = RedisUtil.pool.getResource

            val monitorId = pair._1            val sumOfCarSpeed = pair._2._1            val sumOfCarCount = pair._2._2

            // 模拟数据为实时流入            // 两种情况:            // 1、数据生产时,会产生时间戳字段,流入到 kafka 的事件中            // 2、数据消费时,数据消费的时间,就当做数据的生产时间(会有一些小小误差),本业务选择这种方式

            val dateSDF = new SimpleDateFormat("yyyyMMdd") // 用于 redis 中的 key            val hourMinuteSDF = new SimpleDateFormat("HHmm") // 用于 redis 中的 fields

            val currentTime = Calendar.getInstance().getTime

            val dateTime = dateSDF.format(currentTime) // 20190528            val hourMinuteTime = hourMinuteSDF.format(currentTime) // 1617

            // 选择存入的数据库            jedis.select(dbIndex)            jedis.hset(dateTime + "_" + monitorId, hourMinuteTime, sumOfCarSpeed + "_" + sumOfCarCount)

            println(dateTime + "_" + monitorId, hourMinuteTime, sumOfCarSpeed + "_" + sumOfCarCount)

            // RedisUtil.pool.returnResource(jedis) // 老的 API            jedis.close() // 新的 API          })        })    })

    // Spark 开始工作    ssc.start()    ssc.awaitTermination()  }}

// 复习 Scala 中 Map 的取值方式:

// 方式1-使用 map(key)//   1、如果 key 存在,则返回对应的值。//   2、如果 key 不存在,则抛出异常 [java.util.NoSuchElementException]。//   3、在 Java 中,如果 key 不存在则返回 null。// 方式2-使用 contains 方法检查是否存在 key//  使用 containts 先判断再取值,可以防止异常,并加入相应的处理逻辑。//   1、如果 key 存在,则返回 true。//   2、如果 key 不存在,则返回 false。// 方式3-使用 map.get(key).get 取值//   1、如果 key 存在,则 map.get(key) 就会返回 Some(值),然后 Some(值).get 就可以取出。//   2、如果 key 不存在,则 map.get(key) 就会返回 None。// 方式4-使用 map.getOrElse(key, defaultvalue) 取值//   底层是:def getOrElse[V1 >: V](key: K, default: => V1)//   1、如果 key 存在,则返回 key 对应的值。//   2、如果 key 不存在,则返回默认值。在 java 中底层有很多类似的操作。// 如何选择取值方式建议//   如果我们确定 map 有这个 key,则应当使用 map(key),速度快。//   如果我们不能确定 map 是否有 key,而且有不同的业务逻辑,使用 map.contains() 先判断再加入逻辑。//   如果只是简单的希望得到一个值,使用 map4.getOrElse("ip", "127.0.0.1")

3.2.2 测试

我们使用集群的群起脚本:

开启 zookeeper 集群:

[[email protected] ~]$ zkstart.sh

开启 kafka 集群:

[[email protected] ~]$ kafka-start.sh

开启 redis,在 redis 根目录执行:

// 启动 Redis 服务器[atguigu@hadoop102 redis-4.0.2]$ redis-server /opt/module/redis-4.0.2/myredis/redis.conf

运行数据生产
运行数据消费
查看运行结果:
在 redis 根目录中,举个例子依次执行:

[[email protected] redis-4.0.2]$ redis-cli -h 192.168.25.102 -p 6379192.168.25.102:6379> select 1OK192.168.25.102:6379[1]> keys * 1) "20190528_0014" 2) "20190528_0005" 3) "20190528_0019" 4) "20190528_0009" 5) "20190528_0004" 6) "20190528_0013" 7) "20190528_0016" 8) "20190528_0020" 9) "20190528_0015"10) "20190528_0010"11) "20190528_0018"12) "20190528_0008"13) "20190528_0001"14) "20190528_0003"15) "20190528_0007"16) "20190528_0012"17) "20190528_0002"18) "20190528_0011"19) "20190528_0017"20) "20190528_0006"192.168.25.102:6379[1]> hgetall 20190528_0001 1) "1646" 2) "279_7" 3) "1647" 4) "239_6" 5) "1648" 6) "240_5" 7) "1649" 8) "318_7" 9) "1650"10) "184_6"11) "1651"12) "54_8"13) "1652"14) "81_10"15) "1653"16) "69_9"17) "1654"18) "69_9"19) "1655"20) "57_8"21) "1656"22) "262_6"23) "1657"24) "149_3"25) "1659"26) "168_4"27) "1700"28) "134_4"29) "1701"30) "65_8"31) "1702"32) "81_10"

注意:不要直接复制,每次操作有些内容是有变动的。比如时间相关的,比如 IP 相关的。



小结:

堵车内容回顾:一、数据生产    目的:能够让我们清楚数据结构是什么样子的,实际开发中这部分不是我们做;实际开发中:已有数据结构,已有目标,要做的就是目前手中已有的资料如何实现目标    数据结构:卡口id,车速(没有包含数据生产时的时间戳)    堵车状态的转换逻辑(if else),为的是生产的数据尽可能的贴近现实情况

二、数据消费    kafka(高级 API,spark 提供的工具包) --> redis    时间窗口的大小为 60 秒    时间窗口的滑动步长为 60 秒    数据存储在 redis 中,使用的是数据类型是 Hash(即 Map 类型):KV 模式不变,但是 V 也是一个键值对        key : 20190528_0001        field : 1754        value : 1365_30

天猫双十一(使用 Storm + Flink 实现)1、如果我们使用 SparkStreaming 实现,时间窗口的宽度不能设置太大,可能会出现内存溢出。2、5秒内聚合的数据该如何处理呢?答:保存到 redis 中(即落盘)。3、那么下一个时间窗口的新的数据该如何处理呢?答:先将 redis 中前一个 5 秒的数据读出来,然后和这次的 5 秒数据进行相加后,再放回到 redis 中(即落盘)。小结:所有的流式框架都是这样做的。

流式框架的根本的哲学意义是:仅仅处理中间逻辑,即是进行运算(计算)的,不负责数据存储的。如果在内存中想进行长期的数据累加,就相当于一个不断微分再积分的过程,把时间微分到足够细,细到不会导致内存溢出为止,然后再微分的基础上求和,再把所有的微分结果进行积分。

某一个小时间段内的数据量越小,则时间窗口的宽度就可以设置的越大,那么数据展示的延迟就会变长,但是整体数据处理的效率就会变得越高。--> 不像流式处理了!

3.3 数据建模

  在此我们选择通过有监督学习中的手段建立可以预测下一时刻堵车状态的模型。
拟牛顿图解:

线性滤波图解:

目标卡口与相关卡口关系:

模型图解:

3.3.1 编写代码

思路:
  a) 确定要对哪个监测点进行建模,我们称之为目标监测点。
  b) 找到目标监测点的其他相关监测点(比如相关监测点与目标监测点属于一条公路的)。
  c) 从 redis 中访问得到以上所有监测点若干小时内的历史数据信息(一部分作为训练数据,一部分作为测试数据)。
  d) 提取组装特征向量与目标向量,训练参数集,训练模型。
  e) 测试模型吻合度,将符合吻合度的模型保存到 HDFS 中,同时将模型的保存路径放置于 redis 中。

1) 新建 module:tf_modeling
2) 编写 maven 的 pom.xml 文件,添加 scala 框架支持

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>    <artifactId>tf_modeling</artifactId>    <version>1.0-SNAPSHOT</version>

    <dependencies>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-core_2.11</artifactId>            <version>2.1.1</version>        </dependency>

        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-mllib_2.11</artifactId>            <version>2.1.1</version>        </dependency>

        <dependency>            <groupId>redis.clients</groupId>            <artifactId>jedis</artifactId>            <version>2.9.0</version>        </dependency>    </dependencies></project>

3) 创建 Train.scala 实现上述思路:

package com.atguigu.train

import java.io.{File, PrintWriter}import java.text.SimpleDateFormatimport java.util.{Calendar, Date}

import com.atguigu.utils.RedisUtilimport org.apache.spark.mllib.classification.LogisticRegressionWithLBFGSimport org.apache.spark.mllib.evaluation.MulticlassMetricsimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.mllib.regression.LabeledPointimport org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**  * 堵车预测:建模,不同的卡口不同的模型(函数)  */object Train {  def main(args: Array[String]): Unit = {    // 写入文件的输出流,将本次评估结果保存到下面这个文件中    val writer = new PrintWriter(new File("model_train.txt"))

    // 初始化 Spark    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TrafficTrainModel")    val sc = new SparkContext(sparkConf)

    // 定义 redis 的数据库相关参数    val dbIndex = 1    // 获取 redis 连接    val jedis = RedisUtil.pool.getResource    jedis.select(dbIndex)

    // 设定 目标监测点:你要对哪几个监测点进行建模(本例中对 2 个检测点进行建模)    val targetMonitorIDs = List("0005", "0015")    // 取出 目标监测点的相关监测点:算法工程师告诉我们的(本例中我们随意写几个)    val relationMonitors = Map[String, Array[String]](      "0005" -> Array("0003", "0004", "0005", "0006", "0007"),      "0015" -> Array("0013", "0014", "0015", "0016", "0017")    )

    // 访问 redis 取出 目标监测点的相关监测点 的数据

    // 遍历 目标监测点的相关监测点 的 Map 集合    targetMonitorIDs.map(targetMonitorID => { // 这个 map 执行 2 次      // 初始化时间      // 获取当前时间      val currentDate = Calendar.getInstance().getTime

      // 格式化 当前时间 为 年月日 对象      val dateSDF = new SimpleDateFormat("yyyyMMdd")      // 格式化 当前时间 为 小时分钟数 对象      val hourMinuteSDF = new SimpleDateFormat("HHmm")

      // 格式化当前时间      val dateSDFString = dateSDF.format(currentDate) // 20190528

      // 获取 当前目标监测点的相关监测点      val relationMonitorArray = relationMonitors(targetMonitorID)      // 根据 当前目标监测点的相关监测点,取出当前时间的所有相关监测点的平均车速      val relationMonitorInfo = relationMonitorArray.map(relationMonitorID => { // 这个 map 执行 5 次        (relationMonitorID, jedis.hgetAll(dateSDFString + "_" + relationMonitorID))        // ("0003", {"0900":"1356_30", "0901":"100_2", ..., "0959":"134_4"})      })

      // 创建 3 个数组:因为要使用 拟牛顿法(LBFGS)进行建模,该方法需要      // 第一个数组放 特征因子数据集,      // 第二个数组放 label 标签向量(特征因子对应的结果数据集),      // 第三个数组放 前两者之间的关联(即真正的特征向量)      val dataX = ArrayBuffer[Double]() // 实际的每一分钟的平均车速      val dataY = ArrayBuffer[Double]() // 第 4 分钟的平均车速

      // 用于存放 特征因子数据集 和 特征因子对应的结果数据集 的映射关系      val dataTrain = ArrayBuffer[LabeledPoint]()

      // 确定使用多少时间内的数据进行建模(本例中取 1 小时)      val hours = 1

      // 将时间回退到当前时间的 1 小时之前,时间单位:分钟      // 遍历 目标监测点的数据(外循环)      for (i <- Range(60 * hours, 2, -1)) { // 本例中是 60 到 2(不包括2),步长是 -1,即 60, 59, 58, ..., 5, 4,        dataX.clear()        dataY.clear()

        // 遍历 目标监测点的所有相关监测点 的数据(内循环)        for (index <- 0 to 2) {          // 当前for循环 的时间 = 当前时间的毫秒数 - 1 个小时的毫秒数 + 0分钟的毫秒数,1分钟的毫秒数,2分钟的毫秒数  (第3分钟作为监督学习的结果向量--label 向量)          val oneMoment = currentDate.getTime - 60 * i * 1000 + 60 * index * 1000          // 获取 当前for循环 的时间的小时分钟数          val oneHM = hourMinuteSDF.format(new Date(oneMoment))

          // 获取当前小时分钟数的数据          for ((k, v) <- relationMonitorInfo) { // ("0003", {"0900":"1356_30", "0901":"100_2", ..., "0959":"134_4"})

            // hours 个小时前的后 3 分钟的数据,组装到 dataX 中            if (v.containsKey(oneHM)) { // 判断本次时刻的数据是否存在,如果存在,则取值,否则,则取 -1(表示数据缺失)              val speedAndCarCount = v.get(oneHM).split("_")              val valueX = speedAndCarCount(0).toFloat / speedAndCarCount(1).toFloat // 得到当前这一分钟的平均车速              dataX += valueX            } else {              dataX += -59.0F            }

            // 如果 index == 2,说明已经得到 hours 个小时前的后 3 分钟的数据,并组装到了 dataX 中;如果是目标卡口,则说明下一分钟数据是 label 向量的数据,ze存放 dataY 中            if (index == 2 && targetMonitorID == k) {              val nextMoment = oneMoment + 60 * 1000              val nextHM = hourMinuteSDF.format(new Date(nextMoment))              if (v.containsKey(nextHM)) { // 判断本次时刻的数据是否存在,如果存在,则取值,否则,则不管它(有默认值 0)                val speedAndCarCount = v.get(nextHM).split("_")                val valueY = speedAndCarCount(0).toFloat / speedAndCarCount(1).toFloat // 得到第 4 分钟的平均车速                dataY += valueY              }            }

          }        }

        // 准备训练模型        // 先将 dataX 和 dataY 映射到一个 LabeledPoint 对象中        if (dataY.toArray.length == 1) { // 说明结果集中有数据了          val label = dataY.toArray.head          val record = LabeledPoint(            // 因为使用的是 拟牛顿法(LBFGS) 进行建模,该方法需要 特征结果 有几种情况(不能是无穷种情况)            // label 范围为 0~6(7个类别),越大则道路越通畅            if (label / 10 < 6) (label / 10).toInt else 6, Vectors.dense(dataX.toArray)          )          dataTrain += record        }      }

      // 将特征数据集写入到文件中方便查看,至此,我们的特征数据集已经封装完毕      dataTrain.foreach(record => {        println(record)        writer.write(record.toString() + "\r\n")      })

      // 将特征数据集转为 rdd 数据集      val rddData = sc.parallelize(dataTrain)      // 随机封装训练集和测试集      val randomSplits = rddData.randomSplit(Array(0.6, 0.4), 11L)      val trainData = randomSplits(0)      val testData = randomSplits(1)

      if (!rddData.isEmpty()) {        // 使用训练数据集进行训练模型        val model = new LogisticRegressionWithLBFGS().setNumClasses(7).run(trainData)

        // 使用测试数据集测试训练好的模型        val predictAndLabel = testData.map {          case LabeledPoint(label, feature) =>            val predict = model.predict(feature)            (predict, label)        }

        // 得到当前 目标监测点 的评估值        val metrics = new MulticlassMetrics(predictAndLabel)        val accuracy = metrics.accuracy        println("评估值:" + accuracy)        writer.write(accuracy.toString + "\r\n")

        // 设置评估阈值,评估值范围为[0.0, 1.0],越大 model 越优秀,我们保存评估值大于 0 的评估模型        if (accuracy > 0.6) {          // 将模型保存到 hdfs 中,并将模型路径保存到 redis 中          val hdfsPath = "hdfs://hadoop102:9000/traffic/model/" + targetMonitorID + "_" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date          (currentDate.getTime))          model.save(sc, hdfsPath)

          jedis.hset("model", targetMonitorID, hdfsPath)        }      }    })

    // 释放 redis 连接    // RedisUtil.pool.returnResource(jedis) // 老的 API    jedis.close() // 新的 API    writer.close()  }}

3.3.2 测试

  运行数据模拟与数据采集,等待一会之后,开始进行预测,查看 http://hadoop102:50070 中是否产生对应的模型样本。同时查看 redis 中是否有保存训练好的模型存放路径。

3.4 数据预测

3.4.1 编写代码

思路:
  a) 用户传入想要进行预测的时间节点,读取该时间节点之前 3 分钟,2 分钟和 1 分钟的数据。
  b) 此时应该已经得到了历史数据集,通过该历史数据集预测传入时间点的车流状态。
  尖叫提示:为了方便观察测试,建议传一个历史时间点,这样可以很直观的看到预测结果是否符合期望值。

1) 新建 module:tf_prediction
2) 配置 maven 的 pom.xml 文件

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu</groupId>    <artifactId>tf_prediction</artifactId>    <version>1.0-SNAPSHOT</version>

    <dependencies>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-core_2.11</artifactId>            <version>2.1.1</version>        </dependency>

        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-mllib_2.11</artifactId>            <version>2.1.1</version>        </dependency>

        <dependency>            <groupId>redis.clients</groupId>            <artifactId>jedis</artifactId>            <version>2.9.0</version>        </dependency>    </dependencies></project>

3) 新建 Prediction.scala 文件,实现上述思路

package com.atguigu.predict

import java.text.SimpleDateFormatimport java.util.Date

import com.atguigu.utils.RedisUtilimport org.apache.spark.mllib.classification.LogisticRegressionModelimport org.apache.spark.mllib.linalg.Vectorsimport org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**  * 堵车预测:根据训练出来的模型进行堵车预测  */object Prediction {  def main(args: Array[String]): Unit = {    // 初始化 Spark    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("TrafficPrediction")    val sc = new SparkContext(sparkConf)

    // 时间设置:为了拼凑出 redis 中的 key 和 field 的字段

    // 格式化 时间 为 年月日 对象    val dateSDF = new SimpleDateFormat("yyyyMMdd")    // 格式化 时间 为 小时分钟数 对象    val hourMinuteSDF = new SimpleDateFormat("HHmm")

    // 2019-05-29 13:00    val userSDF = new SimpleDateFormat("yyyy-MM-dd HH:mm")

    // 定义用户传入的日期:想要预测是否堵车的日期    val inputDateString = "2019-05-29 10:29"    val inputDate = userSDF.parse(inputDateString)

    // 得到 redis 中的 key    val dateSDFString = dateSDF.format(inputDate) // 20180529

    val dbIndex = 1    val jedis = RedisUtil.pool.getResource    jedis.select(dbIndex)

    // 想要预测的监测点    // 设定 目标监测点:你要对哪几个监测点进行建模(本例中对 2 个检测点进行建模)    val targetMonitorIDs = List("0005", "0015")    // 取出 目标监测点的相关监测点:算法工程师告诉我们的(本例中我们随意写几个)    val relationMonitors = Map[String, Array[String]](      "0005" -> Array("0003", "0004", "0005", "0006", "0007"),      "0015" -> Array("0013", "0014", "0015", "0016", "0017")    )

    // 遍历 目标监测点的相关监测点 的 Map 集合    targetMonitorIDs.map(targetMonitorID => { // 这个 map 执行 2 次      // 获取 当前目标监测点的相关监测点      val relationMonitorArray = relationMonitors(targetMonitorID)      // 根据 当前目标监测点的相关监测点,取出当前时间的所有相关监测点的平均车速      val relationMonitorInfo = relationMonitorArray.map(relationMonitorID => { // 这个 map 执行 5 次        (relationMonitorID, jedis.hgetAll(dateSDFString + "_" + relationMonitorID))        // ("0003", {"0900":"1356_30", "0901":"100_2", ..., "0959":"134_4"})      })

      // 装载目标时间点之前的 3 分钟的历史数据      val dataX = ArrayBuffer[Double]() // 实际的每一分钟的平均车速

      // 组装数据      for (index <- Range(3, 0, -1)) {        val oneMoment = inputDate.getTime - 60 * index * 1000        val oneHM = hourMinuteSDF.format(new Date(oneMoment)) // 1257

        for ((k, v) <- relationMonitorInfo) {          if (v.containsKey(oneHM)) {            val speedAndCarCount = v.get(oneHM).split("_")            val valueX = speedAndCarCount(0).toFloat / speedAndCarCount(1).toFloat            dataX += valueX          } else {            dataX += -59.0F          }        }      }

      // 加载模型      val modelPath = jedis.hget("model", targetMonitorID)      val model = LogisticRegressionModel.load(sc, modelPath)

      // 进行预测      val predict = model.predict(Vectors.dense(dataX.toArray))

      // 打印展示      println(targetMonitorID + ",堵车评估值:" + predict + ",是否通畅:" + (if (predict > 3) "通畅" else "拥堵"))

      // 结果保存      jedis.hset(inputDateString, targetMonitorID, predict.toString)    })

    // 释放 redis 连接    // RedisUtil.pool.returnResource(jedis) // 老的 API    jedis.close() // 新的 API  }}

3.4.2 测试

  预测任务执行完毕后,进入redis,通过查看对应监测点,对应传入时间节点的具体车速值,来验证预测结果是否正确。

四 项目总结

  与该项目类似的需求还有很多很多,涵盖了生活各个方面。不同的业务,不同的逻辑,不同的思路,不同的数学模型,需要具体情况具体分析。有类似的、但不完全一样的需求,就需要多思考,灵活处理了。

原文地址:https://www.cnblogs.com/chenmingjun/p/10943362.html

时间: 2024-08-26 11:36:50

大数据技术之_26_交通状态预测项目_01_数据模拟 + 数据采集 + 数据建模 + 数据预测 + 项目总结的相关文章

大数据挖掘技术在电网状态监测与诊断中的应用

大数据挖掘技术在电网状态监测与诊断中的应用 吴振扬( 国网吉林省电力有限公司 , 吉林 长春 130000)[ 摘要 ] 大数据是目 前国内外各个领域的一个研究应用热点. 本文基于大数据技术, 阐述了 大数据技术对于电网发展的重要意义,大数据挖掘技术的发展状况: 分析了 大数据挖掘技术的几种算法特点, 并通过比较选择聚类方法作为在电网状态监测与诊断中应用的方法: 运用聚类算法展望将大数据挖掘技术应用于电网状态监测中的可能.[ 关键词 ] 大数据: 电网: 挖掘: 数据: 监测: 预警: 诊断[

视频大数据技术在智慧城市中的应用

现代社会的信息量正以飞快的速度增长,这些信息里又积累着大量的数据.预计到2025年,每年产生的数据信息将会有超过1/3的内容驻留在云平台中或借助云平台处理.我们需要对这些数据进行分析和处理,以获取更多有价值的信息.在未来的"智慧城市"中,会有越来越大的结构化以及非结构化的数据.那么我们如何高效地存储和管理这些数据,如何分析这些数据呢?答案是,我们需要强有力的大数据处理系统进行支撑. 作为目前最火热的词汇之一,大数据在各个领域都已有了较为成熟的应用.在视频监控领域,大数据时代正悄悄来临.

大数据技术 vs 数据库一体机[转]

http://blog.sina.com.cn/s/blog_7ca5799101013dtb.html 目前,虽然大数据与数据库一体机都很火热,但相当一部分人却无法对深入了解这两者的本质区别.这里便对大数据技术(如Hadoop等,主要指MapReduce与NoSQL)与数据库一体机(新一代的主流关系数据库)技术对比如下: 硬件架构 从本质上来讲,两者的硬件架构基本相同,都是采用x86服务器集群的分布式并行模式来应对大规模的数据与计算.但是,数据库一体机的商家大都会对硬件体系进行面向产品化的.系

基于三维GIS技术的公路交通数字孪生系统

交通运输系统是四个现代化建设的重要保障,在"一带一路"倡议规划背景下,互联网+.智慧交通提升到国家新战略.智慧交通的基石是建立可映射物理世界的虚拟世界,因此大多数交通管理平台项目通过抽象建模构造二维电子地图,并在抽象模型上集成数据及分析工具,实现运营期信息化管理.随着设计.施工.运营全生命周期细化管理日益增长的需求,传统的交通地理信息(Geographic Informa-tion System-Transportation,GIS-T)系统的压力也随之增加.交通基础设施数字化映射为三

大数据技术词汇表

Anomaly:见异常值词条. Apache Software Foundation(ASF):专门为支持开源软件项目而办的一个非盈利性组织. ARPU(Average revenue per user):每个用户的平均收入. Artificial neural network:人工神经网络,通常简称神经网络. Avro:一个在Hadoop上的数据序列化系统,设计用于支持大批量数据交换应用. 贝叶斯分析方法(Bayesian Analysis):提供了一种计算假设概率的方法,这种方法是基于假设的

2015年主宰大数据技术的五大发展趋势

大数据技术自出现以来以一种异常火热的速度发展着,且种种迹象表明这种发展趋势在2015年将会继续持续下去.MapR联合创始人兼首席执行官John Schroeder预测,2015年将有五大发展趋势主导大数据技术,MapR是致力于Hadoop分发版的专业公司. 仅仅几年时间里,大数据技术就从之前的炒作阶段逐渐发展成为新数字时代中的核心技术之一.2014年,企业内部的大数据计划慢慢地从测试阶段走向研发和生产.Schroeder表示,2015年,企业的大数据技术将会进一步推进,并向前发展,甚至会产生更多

2019年大数据技术应用发展趋势

2019年大 当前最火热的新兴科技莫过于人工智能,而国内的大数据公司也纷纷转战AI战场.某种程度上,大数据已不再是科技界的话题宠儿.在Gartner的Hype Cycle中,大数据技术也已经进入到了Plateau of Productivity的商业化应用阶段. ? 任何新兴科技总会经历创新萌芽到期望幻灭的周期,这并不奇怪.就2018年行业应用现状来看,大数据正逐步成为企业的标准化应用技术:从早期尝试搭建分布式集群.到数据采集汇总.到数据加工与开发.再到大数据的应用场景落地,企业数据架构已经全面

基于大数据技术的手机用户画像与征信研究

内容提要:手机用户画像是电信运营商实现“数据驱动业务与运营”的重要举措.首先,介绍了手机用户画像过程中对个人隐私保护的方法,然后分析手机用户画像的数据来源与大数据实现技术,最后,通过数据样本实例分析手机用户画像在个人征信中的应用. 引言 随着计算机网络技术的不断发展,“数据即资源”的大数据时代已经来临.用户画像是电信运营商为了避免管道化风险,实现“数据驱动业务与运营”的重要举措.用户画像与应用大数据技术对客户分类密切相关,是单个客户的众多属性标签的累积:另一方面,在运营商涉足的消费金融领域,对手

【大数据】2015 Bossie评选-20个最佳开源大数据技术

2015-10-10 张晓东 东方云洞察东方云洞察 InfoWorld在分布式数据处理.流式数据分析.机器学习以及大规模数据分析领域精选出了2015年的开源工具获奖者,下面我们来简单介绍下这些获奖的技术工具. 1. Spark 在Apache的大数据项目中,Spark是最火的一个,特别是像IBM这样的重量级贡献者的深入参与,使得Spark的发展和进步速度飞快. 与Spark产生最甜蜜的火花点仍然是在机器学习领域.去年以来DataFrames API取代SchemaRDD API,类似于R和Pan