Spark中RDD转换成DataFrame的两种方式(分别用Java和Scala实现)

一:准备数据源

    在项目下新建一个student.txt文件,里面的内容为:

1,zhangsan,20
2,lisi,21
3,wanger,19
4,fangliu,18

二:实现

Java版:

1.首先新建一个student的Bean对象,实现序列化和toString()方法,具体代码如下:

import java.io.Serializable;  

@SuppressWarnings("serial")
public class Student implements Serializable {  

    String sid;
    String sname;
    int sage;
    public String getSid() {
        return sid;
    }
    public void setSid(String sid) {
        this.sid = sid;
    }
    public String getSname() {
        return sname;
    }
    public void setSname(String sname) {
        this.sname = sname;
    }
    public int getSage() {
        return sage;
    }
    public void setSage(int sage) {
        this.sage = sage;
    }
    @Override
    public String toString() {
        return "Student [sid=" + sid + ", sname=" + sname + ", sage=" + sage + "]";
    }
}  

2.转换,具体代码如下

import java.util.ArrayList;  

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;  

public class TxtToParquetDemo {  

    public static void main(String[] args) {  

        SparkConf conf = new SparkConf().setAppName("TxtToParquet").setMaster("local");
        SparkSession spark = SparkSession.builder().config(conf).getOrCreate();  

        reflectTransform(spark);//Java反射
        dynamicTransform(spark);//动态转换
    }  

    /**
     * 通过Java反射转换
     * @param spark
     */
    private static void reflectTransform(SparkSession spark)
    {
        JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();  

        JavaRDD<Student> rowRDD = source.map(line -> {
            String parts[] = line.split(",");  

            Student stu = new Student();
            stu.setSid(parts[0]);
            stu.setSname(parts[1]);
            stu.setSage(Integer.valueOf(parts[2]));
            return stu;
        });  

        Dataset<Row> df = spark.createDataFrame(rowRDD, Student.class);
        df.select("sid", "sname", "sage").
        coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res");
    }
    /**
     * 动态转换
     * @param spark
     */
    private static void dynamicTransform(SparkSession spark)
    {
        JavaRDD<String> source = spark.read().textFile("stuInfo.txt").javaRDD();  

        JavaRDD<Row> rowRDD = source.map( line -> {
            String[] parts = line.split(",");
            String sid = parts[0];
            String sname = parts[1];
            int sage = Integer.parseInt(parts[2]);  

            return RowFactory.create(
                    sid,
                    sname,
                    sage
                    );
        });  

        ArrayList<StructField> fields = new ArrayList<StructField>();
        StructField field = null;
        field = DataTypes.createStructField("sid", DataTypes.StringType, true);
        fields.add(field);
        field = DataTypes.createStructField("sname", DataTypes.StringType, true);
        fields.add(field);
        field = DataTypes.createStructField("sage", DataTypes.IntegerType, true);
        fields.add(field);  

        StructType schema = DataTypes.createStructType(fields);  

        Dataset<Row> df = spark.createDataFrame(rowRDD, schema);
        df.coalesce(1).write().mode(SaveMode.Append).parquet("parquet.res1");
    }
}  

scala版本:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.IntegerType  

object RDD2Dataset {  

  case class Student(id:Int,name:String,age:Int)
  def main(args:Array[String])
  {  

    val spark=SparkSession.builder().master("local").appName("RDD2Dataset").getOrCreate()
    import spark.implicits._
    reflectCreate(spark)
    dynamicCreate(spark)
  }  

 /**
     * 通过Java反射转换
     * @param spark
     */
  private def reflectCreate(spark:SparkSession):Unit={
    import spark.implicits._
    val stuRDD=spark.sparkContext.textFile("student2.txt")
    //toDF()为隐式转换
    val stuDf=stuRDD.map(_.split(",")).map(parts?Student(parts(0).trim.toInt,parts(1),parts(2).trim.toInt)).toDF()
    //stuDf.select("id","name","age").write.text("result") //对写入文件指定列名
    stuDf.printSchema()
    stuDf.createOrReplaceTempView("student")
    val nameDf=spark.sql("select name from student where age<20")
    //nameDf.write.text("result") //将查询结果写入一个文件
    nameDf.show()
  }  

  /**
     * 动态转换
     * @param spark
     */
  private def dynamicCreate(spark:SparkSession):Unit={
    val stuRDD=spark.sparkContext.textFile("student.txt")
    import spark.implicits._
    val schemaString="id,name,age"
    val fields=schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema=StructType(fields)
    val rowRDD=stuRDD.map(_.split(",")).map(parts?Row(parts(0),parts(1),parts(2)))
    val stuDf=spark.createDataFrame(rowRDD, schema)
        stuDf.printSchema()
    val tmpView=stuDf.createOrReplaceTempView("student")
    val nameDf=spark.sql("select name from student where age<20")
    //nameDf.write.text("result") //将查询结果写入一个文件
    nameDf.show()
  }
}  

注:1.上面代码全都已经测试通过,测试的环境为spark2.1.0,jdk1.8。

2.此代码不适用于spark2.0以前的版本。

原文地址:https://www.cnblogs.com/itboys/p/9172780.html

时间: 2024-08-28 13:02:23

Spark中RDD转换成DataFrame的两种方式(分别用Java和Scala实现)的相关文章

Spark中将RDD转换成DataFrame的两种方法

总结下Spark中将RDD转换成DataFrame的两种方法, 代码如下: 方法一: 使用createDataFrame方法 ```java //StructType and convert RDD to DataFrame val schema = StructType( Seq( StructField("name",StringType,true) ,StructField("age",IntegerType,true) ) ) val rowRDD = sp

Spark RDD转换成DataFrame的两种方式

Spark SQL支持两种方式将现有RDD转换为DataFrame.第一种方法使用反射来推断RDD的schema并创建DataSet然后将其转化为DataFrame.这种基于反射方法十分简便,但是前提是在您编写Spark应用程序时就已经知道RDD的schema类型.第二种方法是通过编程接口,使用您构建的StructType,然后将其应用于现有RDD.虽然此方法很麻烦,但它允许您在运行之前并不知道列及其类型的情况下构建DataSet 方法如下 1.将RDD转换成Rows 2.按照第一步Rows的结

将当前view转换成image的两种方式

一  实例方法(将当前view转换成一张图片) - (UIImage *)convertViewToImage { UIGraphicsBeginImageContext(self.bounds.size); [self drawViewHierarchyInRect:self.bounds afterScreenUpdates:YES]; UIImage *screenshot = UIGraphicsGetImageFromCurrentImageContext(); UIGraphicsE

字符串转换成金额的两种方式

'获取暂支金额费用        Dim TemporaryAmount        TemporaryAmount = Browser("SAP").Page("SAP").Frame("表单").WebEdit("暂支金额").GetROProperty("value")        MsgBox CCur(TemporaryAmount)        '去除金额中间的,号        Dim

Spark SQL初始化和创建DataFrame的几种方式

一.前述       1.SparkSQL介绍 Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制. SparkSQL支持查询原生的RDD. RDD是Spark平台的核心概念,是Spark能够高效的处理大数据的各种场景的基础. 能够在Scala中写SQL语句.支持简单的SQL语法检查,能够在Scala中写Hive语句访问Hive数据,并将结果取回作为RDD使用.     2.Spark on Hive和Hive on Spa

将html页改成jsp的两种方式

将html页改成jsp的两种方式 作者: 字体:[增加 减小] 类型:转载 时间:2013-08-13 将html页改成jsp有两种方法,第一种是直接修改html文件,另一种是新建jsp文件.下面为大家详细介绍下具体实现,感兴趣的朋友可以参考下 一般情况,将html页改成jsp有两种方法,第一种是直接修改html文件,另一种是新建jsp文件.下面具体说一下这两种方式. 假设我们要将testPage.html文件修改为testPage.jsp文件.原testPage.html文件内容为: 复制代码

C语言中存储多个字符串的两种方式

C语言中存储多个字符串的两种方式 方式一    二维字符串数组 声明: char name[4][10] = { "Justinian", "Momo", "Becky", "Bush" }; 在内存中的存储: J u s t i n i a n \0 M o m o \0 \0 \0 \0 \0 \0 B e c k y \0 \0 \0 \0 \0 B u s h \0 \0 \0 \0 \0 \0 这种方式会造成内存空间

在基于MVC的Web项目中使用Web API和直接连接两种方式混合式接入

在我之前介绍的混合式开发框架中,其界面是基于Winform的实现方式,后台使用Web API.WCF服务以及直接连接数据库的几种方式混合式接入,在Web项目中我们也可以采用这种方式实现混合式的接入方式,虽然Web API或者WCF方式的调用,相对直接连接数据库方式,响应效率上略差一些,不过扩展性强,也可以调动更多的设备接入,包括移动应用接入,网站接入,Winfrom客户端接入,这样可以使得服务逻辑相对独立,负责提供接口即可.这种方式中最有代表性的就是当前Web API的广泛应用,促进了各个接入端

js中将字符串转换成json的三种方式

1,eval方式解析,恐怕这是最早的解析方式了.如下: function strToJson(str){ var json = eval('(' + str + ')'); return json; } 记得别忘了str两旁的小括号. 2,new Function形式,比较怪异哦.如下 function strToJson(str){ var json = (new Function("return " + str))(); return json; } 3,使用全局的JSON对象,如