大数据-sparkSQL

SparkSQL采用Spark on Hive模式,hive只负责数据存储,Spark负责对sql命令解析执行。

SparkSQL基于Dataset实现,Dataset是一个分布式数据容器,Dataset中同时存储原始数据和元数据(schema)

Dataset的底层封装了RDD,Row类型的RDD就是Dataset< Row >,DataFrame

Dataset数据源包括:json,JDBC,hive,parquet,hdfs,hbase,avro...

API

自带API

Dataset自带了一套api能够对数据进行操作,处理逻辑与sql处理逻辑相同。

//ds代表了一个Dataset<Row>,包括字段:age,name//select name from tableds.select(ds.col("name")).show();//select name ,age+10 as addage from tableds.select(ds.col("name"),ds.col("age").plus(10).alias("addage")).show();//select name ,age from table where age>19ds.select(ds.col("name"),ds.col("age")).where(ds.col("age").gt(19)).show();//select age,count(*) from table group by ageds.groupBy(ds.col("age")).count().show();

SQL的API

将Dataset转为临时表,在通过session对象执行sql,sql结果为Dataset类型

//将Dataset数据临时注册为临时表,指定表名。可以使用两种方法创建ds.registerTempTable("table1");ds.createOrReplaceTempView("table2");//执行sql返回结果Dataset,sql可以同时使用多张临时表Dataset<Row> result = sparkSession.sql("select * from table1");

Dataset方法

//打印元数据,以树形结构呈现ds.printSchema();?//展示dataset的数据,默认显示二十条,可以指定显示条数。显示时数据列以ascii排序显示ds.show();?//将dataset转为RDD<Row> JavaRDD<Row> rowRDD = ds.javaRDD();//将JavaRDD<Row>解析为普通RDDJavaRDD<String> map = rowRDD.map(new Function<Row, String>() {private static final long serialVersionUID = 1L;    @Override    public String call(Row row) throws Exception {        //方式1 指定字段名        String id = row.getAs("id");        String name = row.getAs("name");        Integer age = row.getAs("age");        //方式2 指定字段顺序。不常用。在json新构建的dataset会对字段按ASCII排序        id = row.getString(1);        name = row.getString(2);        age = row.getInt(0);        //返回结果        return id + name + age;    }});?

输出Dataset

Dataset的保存策略(SaveMode)包括:Overwrite(覆盖),Append(追加),ErrorIfExists(如果存在就报错),Ignore(如果存在就忽略)

jdbc输出

Dataset<Row> dataset=sparkSession.sql("...");//构建参数,存入用户名和用户密码Properties userinfo = new Properties();properties.setProperty("user", "root");properties.setProperty("password", "root");?//指定写出模式,数据库连接,用户信息result.writer().    mode(SaveMode.Overwrite).    jdbc( "jdbc:mysql://127.0.0.1:3306/spark",  "table", userinfo);

json输出

df.write().mode(SaveMode.Overwrite).format("json").save("data/parquet");df.write().mode(SaveMode.Overwrite).json("data/parquet");

parquet输出

df.write().mode(SaveMode.Overwrite).format("parquet").save("data/parquet");df.write().mode(SaveMode.Overwrite).parquet("data/parquet");

hive输出

在session中预先开启hive支持,在项目文件中导入hive,hdfs的3个xml

//对session执行hive语句,注意:需要指定表空间sparkSession.sql("use database");//移除原hive表sparkSession.sql("DROP TABLE IF EXISTS table01");//将dataset存入hive中       df.write().mode(SaveMode.Overwrite).saveAsTable("table01");

Dataset创建

通过session创建和停止

//创建session对象SparkSession sparkSession = SparkSession                .builder()                .appName("jsonrdd")                .master("local")                .getOrCreate();/* 逻辑代码 */?//关闭sessionsparkSession.stop();

基于json文件

  • 基于json的构建,在json中已经存在了元数据,可以直接构造dataset
  • 不支持嵌套json
//通过session对象读取json文件构建Dataset
Dataset<Row> ds = sparkSession.read().format("json").load("data/json");
//简写方式:Dataset<Row> ds =sparkSession.read.json("data/json");

//将Dataset转为临时表
ds.createOrReplaceTempView("table")

基于RDD< json>

RDD的每条数据都是一个json字符串,一个单行数据,自带了元数据信息

//通过上下文构建RDD
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> RDD = jsc.parallelize(Arrays.asList(
    "{\"name\":\"zs\",\"score\":\"100\"}",
    "{\"name\":\"sl\",\"score\":\"200\"}",
    "{\"name\":\"ww\",\"score\":\"300\"}"
));
//通过RDD构建dataset
Dataset<Row> ds = sparkSession.read().json(RDD);

对象list创建

//构建存储对象的容器
Person person = new Person();
person.setId("1");
person.setAge(18);
person.setName("zs");
Person person2 = new Person();
person2.setId("2");
person2.setAge(20);
person2.setName("ls");
List<Person> people = Arrays.asList(person, person2);

//对bean类进行转码
Encoder<Person> personEncoder = Encoders.bean(Person.class);

//获取person类型的dataset,内部自带了
Dataset<Person> dataset = sparkSession.createDataset(people, personEncoder);

dataset.registerTempTable("person");

反射创建

不建议使用,修改字段较为麻烦

  • 需要预先创建数据的javabean对象,实现序列化
  • 读取数据构建RDD< String>,通过map算子转为RDD< Bean对象 >
  • createDataFrame传入RDD与bean.class
//通过上下文构建RDD
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> lineRDD = jsc.textFile("/person.txt");
//将RDD的范型处理为bean类型
JavaRDD<Person> RDD = lineRDD.map(new Function<String, Person>() {
    //指定序列化版本
    private static final long serialVersionUID = 1L;
    @Override
    public Person call(String line) throws Exception {
        Person p = new Person();
        p.setId(line.split(",")[0]);
        p.setName(line.split(",")[1]);
        p.setAge(Integer.valueOf(line.split(",")[2]));
        return p;
    }
});
//将RDD转为Dataset,传入RDD和反射类
Dataset<Row> dataFrame = sparkSession.createDataFrame(RDD,Person.class);

bean对象

public class Person implements Serializable {
	/*指定序列化版本*/
	private static final long serialVersionUID = 1L;
	private String id ;
	private String name;
	private Integer age;
    //get+set
	public String getId() { return id; } public void setId(String id) {this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Integer getAge() { return age; } public void setAge(Integer age) { this.age = age; }
}

动态创建

  • 读取数据构建RDD< String>
  • 通过map算子将String转为Row,使用RowFactory的create方法存入数据
  • 构建元数据对象schema,构建方法中传入元数据封装容器,内部存了每个字段的属性
  • 通过RDD与schema构建出Dataset
  • 对象中传入每个字段的属性
//通过上下文构建RDD
SparkContext sc = sparkSession.sparkContext();
JavaSparkContext jsc = new JavaSparkContext(sc);
JavaRDD<String> lineRDD = jsc.textFile("/person.txt");

//将一行数据转为Row类型,使用RowFactory的create方法
JavaRDD<Row> rowRDD = lineRDD.map(new Function<String, Row>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Row call(String line) throws Exception {
        //使用RowFactory.create构建row对象
        return RowFactory.create(
                line.split(",")[0],
                line.split(",")[1],
                Integer.valueOf(line.split(",")[2])
            );
    }
});

//构建字段属性容器,可以通过数据库查询后填充容器,实现动态调整
List<StructField> list = Arrays.asList(
    //分别构建每个字段,参数:字段名,数据类型,是否允许空值
    DataTypes.createStructField("id", DataTypes.StringType, true),
    DataTypes.createStructField("name", DataTypes.StringType, true),
    DataTypes.createStructField("age", DataTypes.IntegerType, true)
);
//构建schema对象
StructType schema = sparkSession.createStructType(list);

//构建Dataset
sparkSession.createDataFrame(rowRDD,schema)

JDBC创建

/* 方式1:创建参数容器传入 */
Map<String, String> options = new HashMap<String, String>();
options.put("url", "jdbc:mysql://127.0.0.1:3306/spark");
options.put("driver", "com.mysql.jdbc.Driver");
options.put("user", "root");
options.put("password", "root");
options.put("dbtable", "person");
//加载参数,传入参数
Dataset<Row> result = sparkSession.read().format("jdbc").options(options).load();

/* 方式2:依次传入参数 */
//创建读取器
DataFrameReader reader = sparkSession.read().format("jdbc");
//依次载入数据
reader.option("url", "jdbc:mysql://127.0.0.1:3306/spark");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "root");
reader.option("password", "root");
reader.option("dbtable", "score");
//加载数据,构建dataset
Dataset<Row> result = reader.load();

parquet创建

parque格式的文件中,存储了数据结构,因此可以直接构建dataset

//数据载入
Dataset load =sparkSession.read().format(parquet).load("data/parquet");
Dataset load = sparkSession.read().parquet("data/parquet");

Hive创建

  • 在session中启动hive支持
  • 项目中导入hive-site.xml,hdfs-site.xml,core-site.xml配置文件
  • 需要hive中提前启动metastore服务
//开启hive支持,在项目文件中导入hive,hdfs的3个xml
SparkSession sparkSession = SparkSession
	.builder()
    .appName("hive")
    .enableHiveSupport()
    .getOrCreate();
//对session执行hive语句,注意:需要指定表空间
sparkSession.sql("use database");

//将hive_sql结果保存在为dataset
Dataset<Row> goodStudentsDF = sparkSession.sql("xxx");

//Dataset<Row> df = hiveContext.table("student_infos");

hive-site.xml的示例:

<configuration>
<property>
  <name>hive.metastore.uris</name>
  <value>thrift://192.168.78.103:9083</value>
</property>
</configuration>

序列化

  • 序列化与反序列化版本需要一致 private static final long serialVersionUID = 1L
  • 子类实现序列化接口,父类未实现序列化接口,则子类中的父类字段无法序列化

    若父类实现了序列化,子类所有字段也能序列化

  • 只序列化成员变量,不序列化静态变量。反序列化时,静态变量值返回原先值
  • transuent修改的变量不序列化

结论:Java的序列化机制是通过在运行时判断类的serialVersionUID来验证版本一致性的。在进行反序列化时,JVM会把传来的字节流中的serialVersionUID与本地相应实体(类)的serialVersionUID进行比较,如果相同就认为是一致的,可以进行反序列化,否则就会出现序列化版本不一致的异常。当实现java.io.Serializable接口的实体(类)没有显式地定义一个名为serialVersionUID,类型为long的变量时,Java序列化机制会根据编译的class自动生成一个serialVersionUID作序列化版本比较用,这种情况下,只有同一次编译生成的class才会生成相同的serialVersionUID 。如果我们不希望通过编译来强制划分软件版本,即实现序列化接口的实体能够兼容先前版本,未作更改的类,就需要显式地定义一个名为serialVersionUID,类型为long的变量,不修改这个变量值的序列化实体都可以相互进行串行化和反串行化。

自定义函数

UDF

  • 通过sparkSession注册UDF,实现UDF函数(UDF0-UDF22)
  • 注册后的UDF在当前的session中有效
//获取session
SparkSession sparkSession = SparkSession
        .builder()
        .appName("udf")
        .master("local")
        .enableHiveSupport()
        .getOrCreate();
//通过session创建UDF,指定函数名,函数逻辑,返回值类型
sparkSession.udf().register(
        "add",
        new UDF2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }, DataTypes.IntegerType);
sparkSession.sql("select add(a,b) from table")

UDAF

可以理解为自定义组函数

通过sparkSession注册函数,实现UserDefinedAggregateFunction类重写以下方法

  • initialize:初始化中间结果缓存,每个分区的每个组初始化一次,总分区的每个组初始化一次
  • upadte:处理数据,合并一个分区中各组的数据,入参为每个组的缓存和当前记录
  • merge:将各分区的中间结果进行合并
  • evaluate:指定最终结果,从最后一个中间结果中取出,数据类型自己指定
  • dataType:指定结果字段的数据类型
  • inputSchema:指定每个输入字段的的列名,数据类型,是否为空。可以指定多个字段
  • bufferSchema:定义buffer中每个字段的属性(列名,数据类型,是否为空),可以指定多个字段
  • deterministic:确保一致性,一般使用true
//通过session创建UDF
sparkSession.udf().register(
        "add",
        new UDF2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }, DataTypes.IntegerType);

sparkSession.udf().register(
        "udaf", new UserDefinedAggregateFunction() {

            //初始化中间结果缓存,每个分区的每个组初始化一次,总分区的每个组初始化一次
            @Override
            public void initialize(MutableAggregationBuffer buffer) {
                //内部可以存储多个数据
                buffer.update(0,0);
            }

            //处理数据,入参为每个组的缓存和当前记录
            @Override
            public void update(MutableAggregationBuffer buffer, Row input) {

                //数据封装在row中,从row中取值字段的值进行逻辑处理
                Object fieldName = input.getAs("fieldName");

                //对缓存中的数据更新
                buffer.update(0,buffer.getInt(0)+1);
            }

            //将各分区的中间结果进行合并
            //参数1:已经合并分区的中间结果或初始的中间结果,其初始化也使用initialize
            //参数2:当前分区的中间结果
            @Override
            public void merge(MutableAggregationBuffer buffer,  Row newBuffer) {
                //获取数据值
                Integer newData = newBuffer.getAs(0);
                Integer beforeData = buffer.getAs(0);
                //更新合并的中间结果
                buffer.update(0,beforeData+newData);
            }

            //指定最终结果,从最后一个中间结果中取出,数据类型自己指定
            @Override
            public Integer evaluate(Row buffer) {
                return buffer.getAs(0);
            }

            //指定每个输入字段的的列名,数据类型,是否为空。可以指定多个字段。
            @Override
            public StructType inputSchema() {
                return DataTypes.createStructType(
                        Arrays.asList(
                                DataTypes.createStructField(
                                        "name",
                                        DataTypes.StringType,
                                        true)
                        )
                );
            }

            //定义buffer中每个字段的属性(列名,数据类型,是否为空),可以指定多个字段
            @Override
            public StructType bufferSchema() {
                return DataTypes.createStructType(
                        Arrays.asList(
                                DataTypes.createStructField(
                                        "count",
                                        DataTypes.IntegerType,
                                        false),
                                DataTypes.createStructField(
                                        "name",
                                        DataTypes.StringType,
                                        true)
                        )
                );
            }

            //指定结果字段的数据类型
            @Override
            public DataType dataType(){return DataTypes.StringType;}

            //确保一致性,一般使用true
            @Override
            public boolean deterministic() { return true; }
        }
);

//上述UDAF模拟了count的功能
sparkSession.sql("select add(name) from table group by name ");

开窗函数

用于分组排序取最高值,适用于hive,mysql,oracle

row_number() over (partitin by xxx order by yyy) as sss

数据根据xxx列分组,再根据yyy排序,排序结果作为sss列的数据,通过对sss列行过滤,取出最大或最小的若干个值

SELECT
	t.id,
	t.uname,
	t.money
FROM
	(
		SELECT
			id,
			uname,
			money,
			row_number () over (
				PARTITION BY uname
				ORDER BY
					money DESC
			) rank
		FROM
			table
	) t
WHERE
	t.rank <= 3

原文地址:https://www.cnblogs.com/javaxiaobu/p/11775071.html

时间: 2024-11-06 13:49:24

大数据-sparkSQL的相关文章

大数据——sparksql

sparksql:http://www.cnblogs.com/shishanyuan/p/4723604.html?utm_source=tuicool spark on yarn :http://sofar.blog.51cto.com/353572/1352713/ http://database.51cto.com/art/201404/435630.htm spark on yarn 爬坑:http://zengzhaozheng.blog.51cto.com/8219051/1597

SparkSQL大数据实战:揭开Join的神秘面纱

本文来自 网易云社区 . Join操作是数据库和大数据计算中的高级特性,大多数场景都需要进行复杂的Join操作,本文从原理层面介绍了SparkSQL支持的常见Join算法及其适用场景. Join背景介绍 Join是数据库查询永远绕不开的话题,传统查询SQL技术总体可以分为简单操作(过滤操作-where.排序操作-limit等),聚合操作-groupby以及Join操作等.其中Join操作是最复杂.代价最大的操作类型,也是OLAP场景中使用相对较多的操作.因此很有必要对其进行深入研究. 另外,从业

大数据数据仓库-基于大数据体系构建数据仓库(Hive,Flume,Kafka,Azkaban,Oozie,SparkSQL)

背景 接着上个文章数据仓库简述,想写一篇数据仓库常用模型的文章,但是自己对数据仓库模型的理解程度和建设架构并没有下面这个技术专家理解的深刻,并且自己去组织语言,可能会有不准确的地方,怕影响大家对数据仓库建模的理解,数据仓库属于一个工程学科,在设计上要体验出工程严谨性,所以这次向大家推荐这篇文章,毕竟IBM在数据仓库和数据集市方面已经做得很成熟了,已经有成型的商业数据仓库组件,这篇文章写的很好,可以让大家很好的理解数据仓库. 版权 作者 周三保([email protected]) IBM 软件部

SparkRDD解密(DT大数据梦工厂)

第一阶段,彻底精通Spark 第二阶段,从0起步,操作项目 Hadoop是大数据的基础设施,存储等等 Spark是计算核心所在 1.RDD:基于工作集的应用抽象 2.RDD内幕解密 3.RDD思考 不掌握RDD的人,不可能成为Spark的高手 绝对精通RDD,解决问题的能力大大提高 各种框架底层封装的都是RDD,RDD提供了通用框架 RDD是Spark的通用抽象基石 顶级SPark高手, 1.能解决问题.性能调优: 2.Spark高手拿Spark过来就是修改的 ==========基于工作集的应

大数据常见错误

1.用./bin/spark-shell启动spark时遇到异常:java.net.BindException: Can't assign requested address: Service 'sparkDriver' failed after 16 retries! 解决方法:add export SPARK_LOCAL_IP="127.0.0.1" to spark-env.sh 2.java Kafka producer error:ERROR kafka.utils.Util

【互动问答分享】第5期决胜云计算大数据时代Spark亚太研究院公益大讲堂

Spark亚太研究院100期公益大讲堂 [第5期互动问答分享] Q1:spark怎样支持即席,应该不是spark sql吧,是hive on spark么? Spark1.0 以前支持即席查询的技术是Shark; Spark 1.0和 Spark 1.0.1支持的即席查询技术是Spark SQL; 尚未发布的Spark 1.1开始 Spark SQL是即席查询的核心,我们期待Hive on Spark也能够支持即席查询: Q2:现在spark 1.0.0版本是支持hive on spark么,它

探析大数据需求下的分布式数据库

一.前言 大数据技术从诞生到现在,已经经历了十几个年头.市场上早已不断有公司或机构,给广大金融从业者"洗脑"大数据未来的美好前景与趋势.随着用户对大数据理念与技术的不断深入了解,人们已经开始从理论探索转向对场景落地的寻找,让大数据在企业中落地并开花结果. 从大数据的管理和应用方向集中在两个领域.第一,大数据分析相关,针对海量数据的挖掘.复杂的分析计算:第二,在线数据操作,包括传统交易型操作以及海量数据的实时访问.大数据高并发查询操作.用户根据业务场景以及对数据处理结果的期望选择不同的大

老李分享大数据生态圈

老李分享大数据生态圈 大数据本身是个很宽泛的概念,Hadoop生态圈(或者泛生态圈)基本上都是为了处理超过单机尺度的数据处理而诞生的.你可以把它比作一个厨房所以需要的各种工具.锅碗瓢盆,各有各的用处,互相之间又有重合.你可以用汤锅直接当碗吃饭喝汤,你可以用小刀或者刨子去皮.但是每个工具有自己的特性,虽然奇怪的组合也能工作,但是未必是最佳选择.         大数据,首先你要能存的下大数据. 传统的文件系统是单机的,不能横跨不同的机器.HDFS(Hadoop Distributed FileSy

Hadoop! | 大数据百科 | 数据观 | 中国大数据产业观察_大数据门户

你正在使用过时的浏览器,Amaze UI 暂不支持. 请 升级浏览器 以获得更好的体验! 深度好文丨读完此文,就知道Hadoop了! 来源:BiThink 时间:2016-04-12 15:14:39 作者:陈飚 “昔我十年前,与君始相识.” 一瞬间Hadoop也到了要初中择校的年龄了. 十年前还没有Hadoop,几年前国内IT圈里还不知道什么是Hadoop,而现在几乎所有大型企业的IT系统中有已经有了Hadoop的集群在运行了各式各样的任务. 2006年项目成立的一开始,“Hadoop”这个单