SparkSQL 初步应用

最近项目中使用SparkSQL来做数据的统计分析,闲来就记录下来。
直接上代码:

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

object SparkSQL {

  //定义两个case class A和B:
  //    A是用户的基本信息:包括客户号、***号和性别
  //    B是用户的交易信息:包括客户号、消费金额和消费状态
  case class A(custom_id:String,id_code:String,sex:String)
  case class B(custom_id:String,money:String,status:Int)
  
  def main(args:Array[String]): Unit = {

    //数据量不大时,测试发现使用local[*]的效率要比local和基于YARN的效率都高。
    //这里使用local[*]模式,设置AppName为"SparkSQL"
    val sc = new SparkContext("local[*]", "SparkSQL")
    val sqlContext = new SQLContext(sc)
    import sqlContext.createSchemaRDD
    
    //定义两个RDD:A_RDD和B_RDD。数据之间以char(1)char(1)分隔,取出对应的客户信息。
    val A_RDD = sc.textFile("hdfs://172.16.30.2:25000/usr/tmpdata/A.dat").map(_.split("\u0001\u0001")).map(t => tbclient(t(0), t(4), t(13)))
    val B_RDD = sc.textFile("hdfs://172.16.30.3:25000/usr/tmpdata/B.dat").map(_.split("\u0001\u0001")).map(t=>tbtrans(t(16),t(33),t(71).toInt))
    
    //将普通RDD转为SchemaRDD
    A_RDD.registerTempTable("A_RDD")
    B_RDD.registerTempTable("B_RDD")
    
 
    def toInt(s: String): Int = {
      try {
        s.toInt
      } catch {
        case e: Exception => 9999
      }
    }

    def myfun2(id_code:String):Int = {
      val i = id_code.length
      i
    }

    //定义函数:根据***号判断属相
    //这里注意Scala的substring方法的使用,和Java、Oracle等都不同
       
    def myfun5(id_code:String):String = {
      var year = ""
      if(id_code.length == 18){
        val md = toInt(id_code.substring(6,10))
        val i = 1900
        val years=new Array[String](12)
        years(0) = "鼠"
        years(1) = "牛"
        years(2) = "虎"
        years(3) = "兔"
        years(4) = "龙"
        years(5) = "蛇"
        years(6) = "马"
        years(7) = "羊"
        years(8) = "猴"
        years(9) = "鸡"
        years(10) = "狗"
        years(11) = "猪"
        year = years((md-i)%years.length)
      }
      year
    }

    //设置年龄段
    
    def myfun3(id_code:String):String = {
      var rt = ""
      if(id_code.length == 18){
        val age = toInt(id_code.substring(6,10))
        if(age >= 1910 && age < 1920){
          rt = "1910 ~ 1920"
        }
        else if(age >= 1920 && age < 1930){
          rt = "1920 ~ 1930"
        }
        else if(age >= 1930 && age < 1940){
          rt = "1930 ~ 1940"
        }
        else if(age >= 1940 && age < 1950){
          rt = "1940 ~ 1950"
        }
        else if(age >= 1950 && age < 1960){
          rt = "1950 ~ 1960"
        }
        else if(age >= 1960 && age <1970){
          rt = "1960 ~ 1970"
        }
        else if(age >= 1970 && age <1980){
          rt = "1970 ~ 1980"
        }
        else if(age >= 1980 && age <1990){
          rt = "1980 ~ 1990"
        }
        else if(age >= 1990 && age <2000){
          rt = "1990 ~ 2000"
        }
        else if(age >= 2000 && age <2010){
          rt = "2000 ~ 2010"
        }
        else if(age >= 2010 && age<2014){
          rt = "2010以后"
        }
      }
      rt
    }

    //划分消费金额区间
    
    def myfun4(money:String):String = {
      var rt = ""
      if(money>="10000" && money<"50000"){
        rt = "10000 ~ 50000"
      }
      else if(money>="50000" && money<"60000"){
        rt = "50000 ~ 60000"
      }
      else if(money>="60000" && money<"70000"){
        rt = "60000 ~ 70000"
      }
      else if(money>="70000" && money<"80000"){
        rt = "70000 ~ 80000"
      }
      else if(money>="80000" && money<"100000"){
        rt = "80000 ~ 100000"
      }
      else if(money>="100000" && money<"150000"){
        rt = "100000 ~ 150000"
      }
      else if(money>="150000" && money<"200000"){
        rt = "150000 ~ 200000"
      }
      else if(money>="200000" && money<"1000000"){
        rt = "200000 ~ 1000000"
      }
      else if(money>="1000000" && money<"10000000"){
        rt = "1000000 ~ 10000000"
      }
      else if(money>="10000000" && money<"50000000"){
        rt = "10000000 ~ 50000000"
      }
      else if(money>="5000000" && money<"100000000"){
        rt = "5000000 ~ 100000000"
      }
      rt
    }

    //根据生日判断星座
    
    def myfun1(id_code:String):String = {

      var rt = ""
      if(id_code.length == 18){
          val md = toInt(id_code.substring(10,14))
          if (md >= 120 && md <= 219){
            rt = "水瓶座"
          }
          else if (md >= 220 && md <= 320){
            rt = "双鱼座"
          }
          else if (md >= 321 && md <= 420){
            rt = "白羊座"
          }
          else if (md >= 421 && md <= 521){
            rt = "金牛座"
          }
          else if (md >= 522 && md <= 621){
            rt = "双子座"
          }
          else if (md >= 622 && md <= 722){
            rt = "巨蟹座"
          }
          else if (md >= 723 && md <= 823){
            rt = "狮子座"
          }
          else if (md >= 824 && md <= 923){
            rt = "***座"
          }
          else if (md >= 924 && md <= 1023){
            rt = "天秤座"
          }
          else if (md >= 1024 && md <= 1122){
            rt = "天蝎座"
          }
          else if (md >= 1123 && md <= 1222){
            rt = "射手座"
          }
          else if ((md >= 1223 && md <= 1231) | (md >= 101 && md <= 119)){
            rt = "摩蝎座"
          }
          else
            rt = "无效"
        }
      rt
    }

    //注册函数
    sqlContext.registerFunction("fun1",(x:String)=>myfun1(x))
    sqlContext.registerFunction("fun3",(z:String)=>myfun3(z))
    sqlContext.registerFunction("fun4",(m:String)=>myfun4(m))
    sqlContext.registerFunction("fun5",(n:String)=>myfun5(n))

    //星座统计,注意,这里必须要有fun2(id_code)=18这个限制,否则,第一个字段有这个限制,而第二个统计字段值却没有这个限制
    
    val result1 = sqlContext.sql("select fun1(id_code),count(*) from A_RDD t where fun2(id_code)=18 group by fun1(id_code)")
    
    //属相统计
    val result2 = sqlContext.sql("select fun5(a.id_code),count(*) from A_RDD a where fun2(id_code)=18 group by fun5(a.id_code)")
    
    //根据消费区间统计消费人数和总金额
    val result3 = sqlContext.sql("select fun4(a.money),count(distinct a.custom_id),SUM(a.money) from B_RDD a where a.status=8 and a.custom_id in (select b.custom_id from A_RDD b where fun2(b.id_code)=18) group by fun4(a.money)")
    
    //打印结果
    result3.collect().foreach(println)
    //也可以将结果保存到OS/HDFS上
    result2.saveAsTextFile("file:///tmp/age")
  }
}

在测试result3的时候,发现报错:

Exception in thread "main" java.lang.RuntimeException: [1.101] failure: ``NOT‘‘ expected but `select‘ found

select fun5(a.id_code),count(*) from A_RDD a where fun2(a.id_code)=18 and a.custom_id IN (select distinct b.custom_id from B_RDD b where b.status=8) group by fun5

(a.id_code)

^

at scala.sys.package$.error(package.scala:27)

at org.apache.spark.sql.catalyst.SqlParser.apply(SqlParser.scala:60)

at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:74)

at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:267)

at SparkSQL$.main(SparkSQL.scala:198)

at SparkSQL.main(SparkSQL.scala)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)

at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)

目前还在调试阶段,目测可能SparkSQL对条件中子查询的支持做的不是很好(只是猜测)。

如有问题,还望路过的高手不吝赐教。

时间: 2024-10-25 19:47:32

SparkSQL 初步应用的相关文章

SparkSQL 初步应用(HiveContext使用)

折腾了一天,终于解决了上节中result3的错误.至于为什么会产生这个错误,这里,先卖个关子,先看看这个问题是如何发现的: 首先,找到了这篇文章:http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-select-syntax-td16299.html  里面有这么一段: The issue is that you're using SQLContext instead of HiveContext. SQLContext im

SparkSQL大数据实战:揭开Join的神秘面纱

本文来自 网易云社区 . Join操作是数据库和大数据计算中的高级特性,大多数场景都需要进行复杂的Join操作,本文从原理层面介绍了SparkSQL支持的常见Join算法及其适用场景. Join背景介绍 Join是数据库查询永远绕不开的话题,传统查询SQL技术总体可以分为简单操作(过滤操作-where.排序操作-limit等),聚合操作-groupby以及Join操作等.其中Join操作是最复杂.代价最大的操作类型,也是OLAP场景中使用相对较多的操作.因此很有必要对其进行深入研究. 另外,从业

大数据数据仓库-基于大数据体系构建数据仓库(Hive,Flume,Kafka,Azkaban,Oozie,SparkSQL)

背景 接着上个文章数据仓库简述,想写一篇数据仓库常用模型的文章,但是自己对数据仓库模型的理解程度和建设架构并没有下面这个技术专家理解的深刻,并且自己去组织语言,可能会有不准确的地方,怕影响大家对数据仓库建模的理解,数据仓库属于一个工程学科,在设计上要体验出工程严谨性,所以这次向大家推荐这篇文章,毕竟IBM在数据仓库和数据集市方面已经做得很成熟了,已经有成型的商业数据仓库组件,这篇文章写的很好,可以让大家很好的理解数据仓库. 版权 作者 周三保([email protected]) IBM 软件部

初步了解CPU

了解CPU By JackKing_defier 首先说明一下,本文内容主要是简单说明CPU的大致原理,所需要的前提知识我会提出,但是由于篇幅我不会再详细讲解需要的其他基础知识.默认学过工科基础课. 一.总述 先从计算机的结构说起,在现代计算机中,CPU是核心,常常被比喻为人的大脑.现在的计算机都为“冯·诺依曼机”,“冯诺依曼机”的一个显著的特点就是由运算器.存储器.控制器.输入设备和输出设备组成.CPU是运算器和控制器合起来的统称,因为运算器和控制器在逻辑关系和电路结构上联系十分紧密,尤其在大

zerglurker的C语言教程004——指针初步讲解

在上次的教程里面,我提到了指针. 针对指针,这次我将简单的讲讲,后面我还会讲到--那个时候你应该有了相当的基础. 首先,先讲讲指针类型. 任何类型关键字后面加一个*符号,就会变成指针类型. 比如: char → char* 字符指针 int → int* 整数指针 double→double* 双精度指针 甚至还可以这样: char*→char** 字符指针的指针类型 →char*** 字符指针的指针的指针类型- 指针本质上是一个内存地址值,该内存地址上存放的是相关类型的数值.但是void*指针

nodejs,webpack安装以及初步运用

nodejs安装: 1.下载:https://nodejs.org/en/download/ 2.安装node-v6.11.3-x64.msi文件,直接默认安装(next--): 3.验证是否完成安装:cmd 进入后输入命令 node -v  回车能得到nodejs版本号: 输入node 回车再输入console.log('aaaaa') 回车能正常显示输出. 这表示nodejs安装成功. webpack安装: 1.npm安装:在f盘新建文件夹webpack,在webpack文件夹建文件夹dem

Github 的注册教程和初步使用体验

我叫许晴,是网工143的学生,学号是1413042064,兴趣包括手绘,看书和手游.学习过c++和汇编语言课程,但在编程方面没什么独立实践经验. 我的Githup用户名是 XQ123 .下面是我在github的注册流程及初步使用体验. 我先搜索github,试了好几次才进去官网,但是在手机客户端注册的话比较好进.这是网页注册的界面.使用名不能设成中文,只能使用数字.字母和特殊符号,不能以短横线开头.如果设置的用户名有重复的话也不能设置 如果设置的用户名已经有人使用的话,也是不能设置的. 然后就是

SparkSQL程序设计

1.创建Spark Session val spark = SparkSession.builder . master("local") .appName("spark session example") .getOrCreate()注:下面的 spark 都指的是 sparkSession 2.将RDD隐式转换为DataFrame import spark.implicits._ 3.SparkSession 介绍 spark中包含 sparkContext和 s

Spark-Sql整合hive,在spark-sql命令和spark-shell命令下执行sql命令和整合调用hive

1.安装Hive 如果想创建一个数据库用户,并且为数据库赋值权限,可以参考:http://blog.csdn.net/tototuzuoquan/article/details/52785504 2.将配置好的hive-site.xml.core-site.xml.hdfs-site.xml放入$SPARK_HOME/conf目录下 [root@hadoop1 conf]# cd /home/tuzq/software/hive/apache-hive-1.2.1-bin [root@hadoo