Spark API编程动手实战-06-对搜狗日志文件深入实战操作

本节中所用到的内容是来自搜狗实验室,网址为:http://www.sogou.com/labs/dl/q.html

我们使用的是迷你版本的tar.gz格式的文件,其大小为87K,下载后如下所示:

上传到服务器后,解压并查看:

查看Sogou文件内容:

该文件的格式如下所示:

访问时间 \t 用户ID \t 查询词 \t 该URL在返回结果中的排名 \ t用户点击的顺序号 \t 用户点击的URL

把解压后的文件上传到hdfs的data目录下:

查看web控制台:

大功告成,文件上传hdfs成功

接下来 我们使用Spark获得搜索结果排名第一同时点击结果排名也是第一的数据量,也就是第四列值为1同时第五列的值也为1的总共的记录的个数。

先读取SogouQ.mini文件:

count操作后:

count之后有2000条记录

首先过滤出有效的数据:

可以发现该文件中的数据都是有效数据。

下面使用spark获得搜索结果排名第一同时点击结果排名也是第一的数据量:

可以发现搜索结果排名第一同时点击结果排名也是第一的数据量为794条;

使用toDebugString查看一下其lineage:

HadoopRDD->MappedRDD->MappedRDD->FilteredRDD->FilteredRDD->FilteredRDD

下面看用户ID查询次数排行榜:

对sortedSogouQRdd进行collect操作:(不要乱collect 会出现OOM的)

结果:

把结果保存在hdfs上:

hdfs命令行查询:

part-0000:

part-00001:

web控制台查询:

我们通过hadoop命令把上述两个文件的内容合并起来:

查看一下合并后的本地文件:

使用head命令查看其具体内容:

时间: 2024-11-09 02:30:33

Spark API编程动手实战-06-对搜狗日志文件深入实战操作的相关文章

Spark API编程动手实战-01-以本地模式进行Spark API实战map、filter和co

首先以spark的本地模式测试spark API,以local的方式运行spark-shell: 先从parallelize入手吧: map操作后结果: 下面看下 filter操作: filter执行结果: 我们用最正宗的scala函数式编程的风格: 执行结果: 从结果 可以看出来,与之前那种分步奏方式结果是一样的 但采用这种方式,即是复合scala风格的写法,也是符合spark的应用程序风格的写法,在spark的编程中,大多数功能的实现都是只要一行代码即可完成.

Spark API编程动手实战-01-以本地模式进行Spark API实战map、filter和collect

首先以spark的本地模式测试spark API,以local的方式运行spark-shell: 先从parallelize入手吧: map操作后结果: 下面看下 filter操作: filter执行结果: 我们用最正宗的scala函数式编程的风格: 执行结果: 从结果 可以看出来,与之前那种分步奏方式结果是一样的 但采用这种方式,即是复合scala风格的写法,也是符合spark的应用程序风格的写法,在spark的编程中,大多数功能的实现都是只要一行代码即可完成.

Spark API编程动手实战-08-基于IDEA使用Spark API开发Spark程序-01

创建一个Scala IDEA工程: 点击“Next”: 点击“Finish”完成工程的创建: 修改项目的属性: 首先修改Modules选项: 在src下创建两个文件夹,并把其属性改为source: 再修改Libraries: 因为要开发Spark程序,所以需要把Spark的开发需要的jar包导进来: 导入包完成后,在工程的scala下面创建一个package: 创建一个Object对象: 完成初始类的创建: 首先构建Spark Driver的模板代码: 该程序是对前面的搜狗日志的处理代码,只不过

Spark API编程动手实战-03-以在Spark 1.2版本实现对Job输出结果进行排序

从前一篇文章中的wordcount的输出结果可以看出来结果是未经排序的,如何对spark的输出结果进行排序呢? 先对reduceByKey的结果进行key,value位置置换(数字,字符),然后再进行数字排序,再将key,value位置置换后就是排序后的结果了,最终将结果存储到HDFS中 可以发现我们成功对输出结果进行排序!

Spark API编程动手实战-02-以集群模式进行Spark API实战textFile、cache、count

操作HDFS:先要保证HDFS启动了: 启动spark集群: 以spark-shell运行在spark集群上: 查看下之前上传到HDFS上的"LICENSE.txt"文件: 用spark读取这个文件: 使用count统计该文件的行数: 我们可以看到count 耗时为0.239708s 对该RDD进行cache操作并执行count使得缓存生效: 执行count结果为: 此时耗时为0.21132s 再执行count操作: 此时耗时为0.029580s,这时因为我们自己基于cache后的数据

Spark API编程动手实战-08-基于IDEA使用Spark API开发Spark程序-02

接下来进行打包,使用Project  Structure的Artifacts: 使用From modules with dependencies: 选择Main Class: 点击“OK”: 把名称改为SparkDemoJar: 因为每台机器上都安装了Scala和Spark,所以可以把Scala和Spark相关的jar文件都删除掉: 接下来进行Build: 选择“Build Artifacts”: 剩下的操作,就是jar包上传到服务器上,然后执行spark-submit命令,我在之前的文章已经详

Spark API编程动手实战-02-以集群模式进行Spark API实战textFile、cach

操作HDFS:先要保证HDFS启动了: 启动spark集群: 以spark-shell运行在spark集群上: 查看下之前上传到HDFS上的"LICENSE.txt"文件: 用spark读取这个文件: 使用count统计该文件的行数: 我们可以看到count 耗时为0.239708s 对该RDD进行cache操作并执行count使得缓存生效: 执行count结果为: 此时耗时为0.21132s 再执行count操作: 此时耗时为0.029580s,这时因为我们自己基于cache后的数据

Spark API编程动手实战-05-spark文件操作和debug

这次 我们以指定executor-memory参数的方式来启动spark-shell: 启动成功了 在命令行中我们指定了spark-shell运行暂用的每个机器上的executor的内存为1g大小,启动成功后参看web页面: 从hdfs上读取文件: 在命令行中返回的MappedRDD,使用toDebugString,可以查看其lineage的关系: 可以看出MappedRDD是从HadoopRDD转换而来的 再看下textFile的源代码: hadoopFile这个方法返回的是一个HadoopR

Spark API编程动手实战-04-以在Spark 1.2版本实现对union、groupByKe

下面看下union的使用: 使用collect操作查看一下执行结果: 再看下groupByKey的使用: 执行结果: join操作就是一个笛卡尔积操作的过程,如下示例: 对rdd3和rdd4执行join操作: 使用collect查看执行结果: 可以看出join操作完全就是一个笛卡尔积的操作: reduce本身在RDD操作中属于一个action类型的操作,会导致job的提交和执行: 下面我们看下lookup的使用: 执行结果: