sparkSQL1.1入门之八:sparkSQL之综合应用

Spark之所以万人瞩目,除了内存计算,还有其ALL-IN-ONE的特性,实现了One stack rule them all。下面简单模拟了几个综合应用场景,不仅使用了sparkSQL,还使用了其他Spark组件:

  • 店铺分类,根据销售额对店铺分类
  • 货品调拨,根据货品的销售数量和店铺之间的距离进行货品调拨

前者将使用sparkSQL+MLlib的聚类算法,后者将使用sparkSQL+GraphX算法。本实验采用IntelliJ IDEA调试代码,最后生成doc.jar,然后使用spark-submit提交给集群运行。

1:店铺分类

分类在实际应用中非常普遍,比如对客户进行分类、对店铺进行分类等等,对不同类别采取不同的策略,可以有效的降低企业的营运成本、增加收入。机器学习中的聚类就是一种根据不同的特征数据,结合用户指定的类别数量,将数据分成几个类的方法。下面举个简单的例子,对第五小结中的hive数据,按照销售数量和销售金额这两个特征数据,进行聚类,分出3个等级的店铺。

在IDEA中建立一个object:SQLMLlib

package doc

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors

object SQLMLlib {
  def main(args: Array[String]) {
    //屏蔽不必要的日志显示在终端上
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //设置运行环境
    val sparkConf = new SparkConf().setAppName("SQLMLlib")
    val sc = new SparkContext(sparkConf)
    val hiveContext = new HiveContext(sc)

    //使用sparksql查出每个店的销售数量和金额
    hiveContext.sql("use saledata")
    val sqldata = hiveContext.sql("select a.locationid, sum(b.qty) totalqty,sum(b.amount) totalamount from tblStock a join tblstockdetail b on a.ordernumber=b.ordernumber group by a.locationid")

    //将查询数据转换成向量
    val parsedData = sqldata.map {
      case Row(_, totalqty, totalamount) =>
        val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)
        Vectors.dense(features)
    }

    //对数据集聚类,3个类,20次迭代,形成数据模型
    //注意这里没设置partition的数量,会使用MLLib的缺省partition数200
    val numClusters = 3
    val numIterations = 20
    val model = KMeans.train(parsedData, numClusters, numIterations)

    //用模型对读入的数据进行分类,并输出
    //由于partition没设置,输出为200个小文件,可以使用bin/hdfs dfs -getmerge 合并下载到本地
    val result2 = sqldata.map {
      case Row(locationid, totalqty, totalamount) =>
        val features = Array[Double](totalqty.toString.toDouble, totalamount.toString.toDouble)
        val linevectore = Vectors.dense(features)
        val prediction = model.predict(linevectore)
        locationid + " " + totalqty + " " + totalamount + " " + prediction
    }.saveAsTextFile(args(0))

    sc.stop()
  }
}

编译打包后运行:

运行过程,可以发现聚类过程都是使用200个partition:

运行完毕,使用getmerge将结果转到本地文件,并查看结果:

最后使用R做示意图,用3种不同的颜色表示不同的类别。

2:货品调拨

在商业活动中,如何将货品放在最需要的地点是一个永恒的命题。在Spark中,可以通过图计算来解决这样的问题:将销售点做为图的顶点,其属性可以是货品的销量、库存等特征;将调拨因素作为边,如距离、使用时间、调拨费用等。通过货品的轮询、调拨点的轮询来获取货品调拨的信息。下面给出一段使用sparksql和graphX综合使用的代码框架:

package doc

//由于暂时手上缺少数据,本例只给出框架,以后有机会补上
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

object SQLGraphX {
  def main(args: Array[String]) {
    //屏蔽不必要的日志显示在终端上
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

    //设置运行环境
    val sparkConf = new SparkConf().setAppName("SQLGraphX")
    val sc = new SparkContext(sparkConf)
    val hiveContext = new HiveContext(sc)

    //切换到销售数据库
    hiveContext.sql("use saledata")

    //使用sparksql查出店铺的销量和库存,作为图的顶点
    //其中locationid为VertexID,(销量,库存)为VD,一般为(Int,Int)类型
    val vertexdata = hiveContext.sql("select a.locationid, b.saleQty, b.InvQty From a join b on a.col1=b.col2 where conditions")

    //使用sparksql查出店铺之间的距离,也可以是花费时间等和调拨相关的属性,作为图的边
    //distance为ED,可以使用Int、Long、Double等数据类型
    val edgedata = hiveContext.sql("select srcid, distid, distance From distanceInfo")

    //构造vertexRDD和edgeRDD
    val vertexRDD: RDD[(Long, (Int, Int))] = vertexdata.map(...)
    val edgeRDD: RDD[Edge[Int]] = edgedata.map(...)

    //构造图Graph[VD,ED]
    val graph: Graph[(Int, Int), Int] = Graph(vertexRDD, edgeRDD)

    //根据调拨的规则进行图处理
    val initialGraph = graph.mapVertices(...)
    initialGraph.pregel(...)

    //输出

    sc.stop()
  }
}

3:小结

通过上面的代码,可以看出,程序除了最后有磁盘落地外,都是在内存中计算的。避免了多个系统中交互数据的落地过程,提高了效率。这才是spark生态系统真正强大之处:One stack rule them all。另外sparkSQL+sparkStreaming可以架构当前非常热门的Lambda架构体系,为CEP提供解决方案。也正是如此强大,才吸引了广大开源爱好者的目光,促进了spark生态的告诉发展。

最近将在炼数成金开课Spark大数据快速计算平台(第三期),本资料为新课素材。本篇近几日再完善一下。

时间: 2024-12-26 06:46:53

sparkSQL1.1入门之八:sparkSQL之综合应用的相关文章

sparkSQL1.1入门之一:为什么sparkSQL

2014年9月11日,Spark1.1.0忽然之间发布.笔者立即下载.编译.部署了Spark1.1.0.关于Spark1.1的编译和部署,请参看笔者博客Spark1.1.0 源码编译和部署包生成 . Spark1.1.0中变化较大是sparkSQL和MLlib,sparkSQL1.1.0主要的变动有: 增加了JDBC/ODBC Server(ThriftServer),用户可以在应用程序中连接到SparkSQL并使用其中的表和缓存表. 增加了对JSON文件的支持 增加了对parquet文件的本地

sparkSQL1.1入门之十:总结

回顾一下,在前面几章中,就sparkSQL1.1.0基本概念.运行架构.基本操作和实用工具做了基本介绍. 基本概念: SchemaRDD Rule Tree LogicPlan Parser Analyzer Optimizer SparkPlan 运行架构: sqlContext运行架构 hiveContext运行架构 基本操作 原生RDD的操作 parquet文件的操作 json文件的操作 hive数据的操作 和其他spark组件混合使用 实用工具 hive/console的操作 CLI的配

sparkSQL1.1入门之二:sparkSQL运行架构

在介绍sparkSQL之前,我们首先来看看,传统的关系型数据库是怎么运行的.当我们提交了一个很简单的查询: SELECT a1,a2,a3 FROM tableA Where condition 可以看得出来,该语句是由Projection(a1,a2,a3).Data Source(tableA).Filter(condition)组成,分别对应sql查询过程中的Result.Data Source.Operation,也就是说SQL语句按Result-->Data Source-->Ope

sparkSQL1.1入门之六:sparkSQL之基础应用

sparkSQL1.1对数据的查询分成了2个分支:sqlContext 和 hiveContext. 在sqlContext中,sparkSQL可以使用SQL-92语法对定义的表进行查询,表的源数据可以来自: RDD parquet文件 json文件 在hiveContext中,sparkSQL可以使用HQL语法,对hive数据进行查询,sparkSQL1.1支持hive0.12的HQL语法:如果遇上不支持的语法,用户可以通过更改配置切换到sql语法.笔者猜测,从spark1.1开始,将打开sq

sparkSQL1.1入门之四:深入了解sparkSQL执行计划

前面两章花了不少篇幅介绍了SparkSQL的执行过程,非常多读者还是认为当中的概念非常抽象.比方Unresolved LogicPlan.LogicPlan.PhysicalPlan是长得什么样子,没点印象.仅仅知道名词,感觉非常缥缈. 本章就着重介绍一个工具hive/console,来加深读者对sparkSQL的执行计划的理解. 1:hive/console安装 sparkSQL从1.0.0開始提供了一个sparkSQL的调试工具hive/console. 该工具是给开发人员使用,在编译生成的

sparkSQL1.1入门之四:深入了解sparkSQL运行计划

前面两章花了不少篇幅介绍了SparkSQL的运行过程,很多读者还是觉得其中的概念很抽象,比如Unresolved LogicPlan.LogicPlan.PhysicalPlan是长得什么样子,没点印象,只知道名词,感觉很缥缈.本章就着重介绍一个工具hive/console,来加深读者对sparkSQL的运行计划的理解. 1:hive/console安装 sparkSQL从1.0.0开始提供了一个sparkSQL的调试工具hive/console.该工具是给开发者使用,在编译生成的安装部署包中并

sparkSQL1.1入门之九:sparkSQL之调优

spark是一个快速的内存计算框架:同时是一个并行运算的框架.在计算性能调优的时候,除了要考虑广为人知的木桶原理外,还要考虑平行运算的Amdahl定理. 木桶原理又称短板理论,其核心思想是:一只木桶盛水的多少,并不取决于桶壁上最高的那块木块,而是取决于桶壁上最短的那块.将这个理论应用到系统性能优化上,系统的最终性能取决于系统中性能表现最差的组件.例如,即使系统拥有充足的内存资源和CPU资源,但是如果磁盘I/O性能低下,那么系统的总体性能是取决于当前最慢的磁盘I/O速度,而不是当前最优越的CPU或

sparkSQL1.1入门之三:sparkSQL组件之解析

上篇在总体上介绍了sparkSQL的运行架构及其基本实现方法(Tree和Rule的配合),也大致介绍了sparkSQL中涉及到的各个概念和组件.本篇将详细地介绍一下关键的一些概念和组件,由于hiveContext继承自sqlContext,关键的概念和组件类似,只不过后者针对hive的特性做了一些修正和重写,所以本篇就只介绍sqlContext的关键的概念和组件. 概念: LogicalPlan 组件: SqlParser Analyzer Optimizer Planner 1:Logical

sparkSQL1.1入门之七:ThriftServer和CLI

spark1.1相较于spark1.0,最大的差别就在于spark1.1增加了万人期待的CLI和ThriftServer.使得hive用户还有用惯了命令行的RDBMS数据库管理员很容易地上手sparkSQL,在真正意义上进入了SQL时代.下面先简单介绍其使用,限于时间关系,以后再附上源码分析. 1:ThriftServer和CLI的命令参数 A:令人惊讶的CLI 刚部署好spark1.1就迫不及待地先测试CLI,对于习惯了sql命令行的本人,失去了shark后,对于sparkSQL1.0一度很是