spark定制之六:sql版start.scala

上个版本的start.scala用的是HiveContext,这个是SQLContext的,不需编译。

# cat testperson.txt #字段用table键分隔

zs 10
30.0

li 12 32.0

# spark-shell -i:start.scala

scala> help

根据提示逐步运行

import org.apache.spark.sql.SchemaRDD  

var FIELD_SEPERATOR = "\t"
var RECORD_SEPERATOR = "\n"
var lastrdd : SchemaRDD = null  

object MyFileUtil extends java.io.Serializable {
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.fs.FileSystem
    import org.apache.hadoop.fs.FileStatus
    import scala.collection.mutable.ListBuffer  

    def regularFile(filepath:String):String = {
        if(filepath == "") {
            filepath;
        } else if(filepath.startsWith("hdfs:")) {
            filepath
        } else if(filepath.startsWith("file:")) {
            filepath
        } else if(filepath.startsWith("/")) {
            "file://" + filepath
        } else {
            val workdir = System.getProperty("user.dir")
            "file://" + workdir + "/" + filepath
        }
    }  

    var SAFEMINPATH_LENGTH : Int = 24  

    def getFileSystem(filepath:String) = {
        if(filepath.startsWith("hdfs:")) {
            FileSystem.get(new org.apache.hadoop.conf.Configuration());
        } else if(filepath.startsWith("file:")) {
            FileSystem.getLocal(new org.apache.hadoop.conf.Configuration());
        } else {
            throw new Exception("file path invalid")
        }
    }  

    def deletePath(filepath:String) = {
        if(filepath.length < SAFEMINPATH_LENGTH)
            throw new Exception("file path is to short")
        var fs : FileSystem = getFileSystem(filepath)
        if (fs.exists(new Path(filepath))) {
            fs.delete(new Path(filepath), true);
        }
    }  

    def listFile(fs:FileSystem, path:Path, pathlist:ListBuffer[Path], statuslist:ListBuffer[FileStatus]=null) {
        if ( fs.exists(path) ) {
            val substatuslist =  fs.listStatus(path);
            for(substatus <- substatuslist){
                if(statuslist != null)
                    statuslist.append(substatus)
                if(substatus.isDir()){
                    listFile(fs,substatus.getPath(),pathlist);
                }else{
                    pathlist.append(substatus.getPath());
                }
            }
        }
    }  

    def hasContext(filepath:String) = {
        val realpath = regularFile(filepath)
        val fs = getFileSystem(realpath)
        val pathlist = ListBuffer[Path]()
        val statuslist = ListBuffer[FileStatus]()
        listFile(fs,new Path(filepath),pathlist,statuslist)
        var length:Long = 0
        for( status <- statuslist )
            length += status.getLen()
        length > 0
    }
}  

org.apache.spark.repl.Main.interp.command("""
class MySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) extends java.io.Serializable {  

    def go() = {
        var startstr = ""
        var endstr = RECORD_SEPERATOR
        val result = rdd.collect
        result.foreach( x =>
            print(x.mkString(startstr,FIELD_SEPERATOR,endstr))
          )
    }  

    def result() = {
        rdd.collect
    }  

    def saveto(output: String) = {
        import org.apache.hadoop.io.{NullWritable,Text}
        var startstr = ""
        var endstr = RECORD_SEPERATOR
        if(output.startsWith("hdfs:")) {
            val outputpath = MyFileUtil.regularFile(output)
            MyFileUtil.deletePath(outputpath)
            rdd.map(x =>
                  (NullWritable.get(), new Text(x.mkString(FIELD_SEPERATOR)))
                ).saveAsHadoopFile[
                  org.apache.hadoop.mapred.TextOutputFormat[NullWritable, Text]
                ](outputpath)
        } else {
            val outputpath = MyFileUtil.regularFile(output)
            MyFileUtil.deletePath(outputpath)
            val result = rdd.collect()
            val writer = new java.io.FileWriter(output)
            result.foreach(x =>
                writer.write(x.mkString(startstr,FIELD_SEPERATOR,endstr))
              )
            writer.close()
        }
    }
}
object MySchemaRDD {
    implicit def toMySchemaRDD(rdd:org.apache.spark.sql.SchemaRDD) = new MySchemaRDD(rdd)
}
""")  

val ssc = new org.apache.spark.sql.SQLContext(sc)
import ssc._
import MySchemaRDD._  

def getRegisterString(rddname:String,classname:String,tablename:String,tabledef:String) : String = {
    val members = tabledef.trim.split(",").map(_.trim.split(" ").filter(""!=)).map(x => (x(0).trim,x(1).trim.head.toString.toUpperCase+x(1).trim.tail))
    val classmemberdef = members.map(x => (x._1+":"+x._2)).mkString(",")
    val convertstr = members.map(x => x._2).zipWithIndex.map(x => "t("+x._2+").to"+x._1).mkString(",")
    return s"""
        case class ${classname}(${classmemberdef})
        val schemardd = ${rddname}.map(_.split("${FIELD_SEPERATOR}")).map(t=>${classname}(${convertstr}))
        ssc.registerRDDAsTable(schemardd,"${tablename}")
    """
}  

org.apache.spark.repl.Main.interp.command("""
class MyCommandTranslator(cmd:String) extends java.io.Serializable {  

    def go()(implicit f: SchemaRDD => MySchemaRDD) = {
        lastrdd = sql(cmd)
        lastrdd.go()
    }  

    def saveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {
        lastrdd = sql(cmd)
        lastrdd.saveto(output)
    }  

    def result()(implicit f: SchemaRDD => MySchemaRDD) = {
        lastrdd = sql(cmd)
        lastrdd.result()
    }  

//    def hqlgo()(implicit f: SchemaRDD => MySchemaRDD) = {
//        lastrdd = hql(cmd)
//        lastrdd.go()
//    }
//
//    def hqlsaveto(output: String)(implicit f: SchemaRDD => MySchemaRDD) = {
//        lastrdd = hql(cmd)
//        lastrdd.saveto(output)
//    }
//
//    def hqlresult()(implicit f: SchemaRDD => MySchemaRDD) = {
//        lastrdd = hql(cmd)
//        lastrdd.result()
//    }  

    def defineas(tabledef:String) = {
        if( tabledef != "" ) {
            org.apache.spark.repl.Main.interp.command(
                getRegisterString(cmd,cmd.toUpperCase,cmd,tabledef)
            )
        } else {
            org.apache.spark.repl.Main.interp.command(
                "ssc.registerRDDAsTable(${cmd},\"${cmd}\")"
            )
        }
    }  

    def from(filepath:String) {
        if( cmd.trim.startsWith("create table ") ) {
            val tablename = cmd.trim.substring(13).trim().split(" ")(0)
            val leftstr = cmd.substring(13).trim().substring(tablename.length).trim()
            val tabledef = leftstr.substring(1,leftstr.length-1).trim()
            val realfile = MyFileUtil.regularFile(filepath)
            org.apache.spark.repl.Main.interp.command(
                "val "+tablename+" = sc.textFile(\""+realfile+"\")"
            )
            new MyCommandTranslator(tablename).defineas(tabledef)
        } else {
            println("usage:")
            println("\"create table sometablename (field1 string,field2 int...)\" from \"somefile or hdfs:somepath\"")
        }
    }  

    def isok() = {
        if(cmd.contains(".") || cmd.contains("/")) {
            MyFileUtil.hasContext(cmd)
        } else {
            val res = sql(s"select count(*) from ${cmd}").result()
            val count = res(0).getLong(0)
            count > 0
        }
    }
}
object MyCommandTranslator {
    implicit def stringToTranslator(cmd:String) = new MyCommandTranslator(cmd)  

    def show(tabledata:Array[org.apache.spark.sql.Row]) = {
        tabledata.foreach( x => println(x.mkString("\t")))
    }
}
""")  

def to = MyCommandTranslator
import MyCommandTranslator._  

val onetable = sql("select 1 as id")
ssc.registerRDDAsTable(onetable,"onetable")  

def help = {
    println("""example:
        "create table testperson (name string,age int,weight double)" from "testperson.txt"
        "select * from testperson" go
        "select * from testperson" saveto "somelocalfile.txt"
        "select * from testperson" saveto "hdfs:/basedir/parentdir/testperson"
        "testperson" isok
        "somelocalfile.txt" isok
        "hdfs:/basedir/parentdir/testperson" isok
        val data = "select * from testperson" result
        to show data
        val somerdddata = sc.textFile("hdfs:/basedir/parentdir/testperson")<span style="font-family: Arial, Helvetica, sans-serif;"> </span>
        "somerdddata" defineas "name string,age int,weight double"
        "select * from somerdddata" go
        if you want to see the help of enveronment, please type :help
        """)
}

spark定制之六:sql版start.scala

时间: 2024-10-01 07:13:00

spark定制之六:sql版start.scala的相关文章

spark定制之五:使用说明

背景 spark-shell是一个scala编程解释运行环境,能够通过编程的方式处理逻辑复杂的计算,但对于简单的类似sql的数据处理,比方分组求和,sql为"select g,count(1) from sometable group by g",须要写的程序是: val hive = neworg.apache.spark.sql.hive.HiveContext(sc) import hive._ val rdd = hql("selectg,count(1) from

Spark编程实现SQL查询的实例

1.Oracle中的SQL select count(1) from a_V_PWYZL_CUSTACCT_PSMIS t where not exists (select 1 from tb_show_multi_question q WHERE q.dqmp_rule_code = '仅比对系统有' and q.dqmp_role_id = '105754659' and q.DQMP_target_id = t.dqmp_mrid) AND NOT EXISTS (select /*+ i

【SQL】sql版Split函数。用于拆分字符串为单列表格

原文:[SQL]sql版Split函数.用于拆分字符串为单列表格 功能与.net版string.Split函数类似,只不过.net返回的是数组,这个返回的是一个单列表格,每个拆分出来的子串占一行.可选是否移除空格子串和重复项.市面上类似的函数不算少,但大多都是在循环中对原串进行改动,我感觉这样不好,虽然不知道sql的字符串是不是像.net的一样具有不可变性,但感觉尽量不要去动原串最好,万一sql的字串也不可变,那变一次就要产生一份,尤其是每圈循环都在变,内存消耗让人心疼,所以才有重新造个轮子的想

Spark源码的角度思考Scala中的模式匹配

学习了从Spark源码的角度思考Scala中的模式匹配,如case class应用,伴生对象中用apply,所以没有new class,直接进行模式匹配,例子如下 Case class RegisterWorker( Id: string, Host: string, Port: int, Cores:int, Memory:int, webUiPort:int, publicAddress:string, Extend DeployMessage{ Utils.checkHost(host,”

Spark createDirectStream 维护 Kafka offset(Scala)

createDirectStream方式需要自己维护offset,使程序可以实现中断后从中断处继续消费数据. KafkaManager.scala import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.Decoder import org.apache.spark.SparkException import org.apache.spark.rdd

Spark入门(Python版)

Hadoop是对大数据集进行分布式计算的标准工具,这也是为什么当你穿过机场时能看到”大数据(Big Data)”广告的原因.它已经成为大数据的操作系统,提供了包括工具和技巧在内的丰富生态系统,允许使用相对便宜的商业硬件集群进行超级计算机级别的计 算.2003和2004年,两个来自Google的观点使Hadoop成为可能:一个分布式存储框架(Google文件系统),在Hadoop中被实现为HDFS:一个分布式计算框架(MapReduce). 这两个观点成为过去十年规模分析(scaling anal

(升级版)Spark从入门到精通(Scala编程、案例实战、高级特性、Spark内核源码剖析、Hadoop高端)

本课程主要讲解目前大数据领域最热门.最火爆.最有前景的技术——Spark.在本课程中,会从浅入深,基于大量案例实战,深度剖析和讲解Spark,并且会包含完全从企业真实复杂业务需求中抽取出的案例实战.课程会涵盖Scala编程详解.Spark核心编程.Spark SQL和Spark Streaming.Spark内核以及源码剖析.性能调优.企业级案例实战等部分.完全从零起步,让学员可以一站式精通Spark企业级大数据开发,提升自己的职场竞争力,实现更好的升职或者跳槽,或者从j2ee等传统软件开发工程

Spark 3000门徒第四课scala模式匹配和类型参数总结

今晚听了王家林老师的Spark 3000门徒系列第四课scala模式匹配和类型参数,总结如下: 模式匹配:def data(array:Array[String]){ array match{ case Array(a,b,c) => println(a+b+c) case Array("spark",_*) => //匹配以spark为第一元素的数组 case _ => ... }} 课后作业是: 阅读Spark源码 RDD.HadoopRDD.SparkConte

Spark 3000门徒第三课scala高阶函数总结

今晚听了王家林老师3000门徒spark系列课程的第三课,讲述了scala函数,下面写一下心得: 普通函数:def fun1(name:String){println(name)} 函数赋值给变量:val fun1 = functionName _ 匿名函数:val fun2 = (name:String) => prinln(name) 高阶函数:def bigData(func:(String) => Unit, content: String){func(content)} 返回值是函数