spark sql 基本用法

一、通过结构化数据创建DataFrame:

publicstaticvoid main(String[] args) {
   SparkConf conf = new SparkConf()

.setAppName("DataFrameCreate").setMaster("local");

JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);
        
        DataFrame df = sqlContext.read().json("hdfs://spark1:9000/students.json");  //结构化数据直接加载为DataFrame
        
        df.show();  
    }

二、通过RDD创建DataFrame的两种创建方式

 (数据源students.txt的数据截图)

2.1通过已知类型的schema创建DataFrame,代码如下:

public static void main(String[] args) {
        SparkConf conf = new SparkConf()
            .setMaster("local")
            .setAppName("RDD2DataFrameReflection");
        JavaSparkContext sc = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sc);

JavaRDD<String> lines = sc.textFile("D://students.txt");
        
        //将lines转换成 JavaRDD<Student>
        JavaRDD<Student> students = lines.map(new Function<String, Student>() {

private static final long serialVersionUID = 1L;

@Override
            public Student call(String line) throws Exception {
                // TODO Auto-generated method stub
                String[] strPlits = line.split(",");
                Student stu = new Student();
                
                stu.setId(Integer.valueOf(strPlits[0]));
                stu.setName(strPlits[1]);
                stu.setAge(Integer.valueOf(strPlits[2]));
                
                return stu;
            }
            
        });
                
        // 使用反射方式,将RDD转换为DataFrame
        // 这里要求,JavaBean必须实现Serializable接口,是可序列化的

//根据student的schema 和 RDD创建DataFrame
        DataFrame studentsDF = sqlContext.createDataFrame(students, Student.class);
        studentsDF.show();
    }

2.2手动创建schema的方式创建DataFrame

public static void main(String[] args) {

//...  省略创建sqlContext的过程

JavaRDD<String> lines = sc.textFile("D://students.txt");
        
        //将普通RDD装换成JavaRDD<Row>
        JavaRDD<Row> rowRDD = lines.map(new Function<String, Row>() {

private static final long serialVersionUID = 1L;

@Override
            public Row call(String line) throws Exception {
                String[] strArray = line.split(",");
                Row row= RowFactory.create(
                        Integer.valueOf(strArray[0]),    //id
                        strArray[1],    //name
                        Integer.valueOf(strArray[2]));    //age
            
                return row;
            }
        });
        
        //第二步 创建元类型, 即创建schema
        List<StructField> structFields = new ArrayList<StructField>();
        structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));  
        structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));  
        structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));  
        StructType structType = DataTypes.createStructType(structFields);

//根据元数据类型将JavaRDD<Row>转化成DataFrame
        DataFrame studentDF = sqlCotnext.createDataFrame(rowRDD, structType);

studentDF.show();
    }

-》DataFrame、RDD、List互转

JavaRDD<Row> rows = studentDF.javaRDD();

List<Row> studentList = rows.collect();

三、DataFrame基本用法

// 打印DataFrame中所有的数据(select * from ...)

df.show();
        // 打印DataFrame的元数据(Schema)
        df.printSchema();
        // 查询某列所有的数据
        df.select("name").show();  
        // 查询某几列所有的数据,并对列进行计算
        df.select(df.col("name"), df.col("age").plus(1)).show();
        // 根据某一列的值进行过滤
        df.filter(df.col("age").gt(18)).show();
        // 根据某一列进行分组,然后进行聚合
        df.groupBy(df.col("age")).count().show();

DataFrame studentDF = sqlCotnext.createDataFrame(rowRDD, structType);
        studentDF.show();

studentDF.registerTempTable("students"); //将DataFrame注册为零时表,取名students
        
        //对students零时表做sql查询
        DataFrame oldStudentDF = sqlCotnext.sql("select * from students where age>18");

oldStudentDF.show();

时间: 2024-11-10 07:51:27

spark sql 基本用法的相关文章

DataFrame编程模型初谈与Spark SQL

Spark SQL在Spark内核基础上提供了对结构化数据的处理,在Spark1.3版本中,Spark SQL不仅可以作为分布式的SQL查询引擎,还引入了新的DataFrame编程模型. 在Spark1.3版本中,Spark SQL不再是Alpha版本,除了提供更好的SQL标准兼容之外,还引进了新的组件DataFrame.同时,Spark SQL数据源API也实现了与新组件DataFrame的交互,允许用户直接通过Hive表.Parquet文件以及一些其他数据源生成DataFrame.用户可以在

Spark SQL实现日志离线批处理

一. 基本的离线数据处理架构: 数据采集   Flume:Web日志写入到HDFS 数据清洗   脏数据 Spark.Hive.MR等计算框架来完成. 清洗完之后再放回HDFS 数据处理   按照需要,进行业务的统计和分析. 也通过计算框架完成 处理结果入库   存放到RDBMS.NoSQL中 数据可视化    通过图形化展示出来.  ECharts.HUE.Zeppelin 处理框图: 1 2 3 4 5 6 7为离线处理,其中5不一定是Hive(还有Spark SQL等) 6不一定是RDBM

Spark SQL的介绍和DataFrame的建立及使用

1. Spark SQL定位处理结构化数据的模块.SparkSQL提供相应的优化机制,并支持不同语言的开发API. java.scala.Python,类SQL的方法调用(DSL) 2. RDD与Spark SQL的比较说明: 使用Spark SQL的优势:a.面向结构化数据:b.优化机制: RDD缺点:a.没有优化机制,如对RDD执行Filter操作: b.RDD类型转换后无法进行模式推断 3. DataFrame/SchemaRDD DataFrame是一个分布式的数据集合,该数据集合以命名

理解Spark SQL(二)—— SQLContext和HiveContext

使用Spark SQL,除了使用之前介绍的方法,实际上还可以使用SQLContext或者HiveContext通过编程的方式实现.前者支持SQL语法解析器(SQL-92语法),后者支持SQL语法解析器和HiveSQL语法解析器,默认为HiveSQL语法解析器,用户可以通过配置切换成SQL语法解析器来运行HiveQL不支持的语法,如:select 1.实际上HiveContext是SQLContext的子类,因此在HiveContext运行过程中除了override的函数和变量,可以使用和SQLC

Spark SQL 之 Join 实现

原文地址:Spark SQL 之 Join 实现 Spark SQL 之 Join 实现 涂小刚 2017-07-19 217标签: spark , 数据库 Join作为SQL中一个重要语法特性,几乎所有稍微复杂一点的数据分析场景都离不开Join,如今Spark SQL(Dataset/DataFrame)已经成为Spark应用程序开发的主流,作为开发者,我们有必要了解Join在Spark中是如何组织运行的. SparkSQL总体流程介绍 在阐述Join实现之前,我们首先简单介绍SparkSQL

Spark sql 在yarn-cluster模式下找不到表

在hive里建一个数据库test,在数据库里建了一张表user,然后在Spark程序中使用Spark sql读取这张表 "select * form test.user" 当部署模式是spark stand模式和yarn-client模式时,程序可以正常运行,但yarn-cluster模式就报了找不到"test.user"表的错误. 解决办法: spark和hive整合,把hive-site.xml加到spark根目录的conf下,所以,要在提交Spark任务的时候

spark SQL概述

Spark SQL是什么? 何为结构化数据 sparkSQL与spark Core的关系 Spark SQL的前世今生:由Shark发展而来 Spark SQL的前世今生:可以追溯到Hive Spark SQL的前世今生:Hive 到Shark(在Hive上做改进) Spark SQL的前世今生:Shark 到Spark SQL(彻底摆脱但是兼容Hive) Spark SQL的前世今生:Hive 到Hive on Spark

Spark SQL数据加载和保存实战

一:前置知识详解: Spark SQL重要是操作DataFrame,DataFrame本身提供了save和load的操作, Load:可以创建DataFrame, Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型. 二:Spark SQL读写数据代码实战: import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRD

MySQL &#183; 性能优化 &#183; MySQL常见SQL错误用法

前言 MySQL在2016年仍然保持强劲的数据库流行度增长趋势.越来越多的客户将自己的应用建立在MySQL数据库之上,甚至是从Oracle迁移到MySQL上来.但也存在部分客户在使用MySQL数据库的过程中遇到一些比如响应时间慢,CPU打满等情况.阿里云RDS专家服务团队帮助云上客户解决过很多紧急问题.现将<ApsaraDB专家诊断报告>中出现的部分常见SQL问题总结如下,供大家参考. 常见SQL错误用法 1. LIMIT 语句 分页查询是最常用的场景之一,但也通常也是最容易出问题的地方.比如