package com.hollysys.spark import java.util import org.apache.spark.sql.types._ import org.apache.spark.sql.{Row, SQLContext, SparkSession} /** * Created by shirukai on 2018/7/17 * 创建DataFrame的几种方式 */ object CreateDataFrameTest { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .appName(this.getClass.getSimpleName).master("local") .getOrCreate() //第一种:通过Seq生成 val df = spark.createDataFrame(Seq( ("ming", 20, 15552211521L), ("hong", 19, 13287994007L), ("zhi", 21, 15552211523L) )) toDF("name", "age", "phone") df.show() //第二种:通过读取Json文件生成 val dfJson = spark.read.format("json").load("/Users/shirukai/Desktop/HollySys/Repository/sparkLearn/data/student.json") dfJson.show() //第三种:通过读取Csv文件生成 val dfCsv = spark.read.format("csv").option("header", true).load("/Users/shirukai/Desktop/HollySys/Repository/sparkLearn/data/students.csv") dfCsv.show() //第四种:通过Json格式的RDD生成(弃用) val sc = spark.sparkContext import spark.implicits._ val jsonRDD = sc.makeRDD(Array( "{\"name\":\"ming\",\"age\":20,\"phone\":15552211521}", "{\"name\":\"hong\", \"age\":19,\"phone\":13287994007}", "{\"name\":\"zhi\", \"age\":21,\"phone\":15552211523}" )) val jsonRddDf = spark.read.json(jsonRDD) jsonRddDf.show() //第五种:通过Json格式的DataSet生成 val jsonDataSet = spark.createDataset(Array( "{\"name\":\"ming\",\"age\":20,\"phone\":15552211521}", "{\"name\":\"hong\", \"age\":19,\"phone\":13287994007}", "{\"name\":\"zhi\", \"age\":21,\"phone\":15552211523}" )) val jsonDataSetDf = spark.read.json(jsonDataSet) jsonDataSetDf.show() //第六种: 通过csv格式的DataSet生成 val scvDataSet = spark.createDataset(Array( "ming,20,15552211521", "hong,19,13287994007", "zhi,21,15552211523" )) spark.read.csv(scvDataSet).toDF("name","age","phone").show() //第七种:动态创建schema val schema = StructType(List( StructField("name", StringType, true), StructField("age", IntegerType, true), StructField("phone", LongType, true) )) val dataList = new util.ArrayList[Row]() dataList.add(Row("ming",20,15552211521L)) dataList.add(Row("hong",19,13287994007L)) dataList.add(Row("zhi",21,15552211523L)) spark.createDataFrame(dataList,schema).show() //第八种:读取数据库(mysql) val options = new util.HashMap[String,String]() options.put("url", "jdbc:mysql://localhost:3306/spark") options.put("driver","com.mysql.jdbc.Driver") options.put("user","root") options.put("password","hollysys") options.put("dbtable","user") spark.read.format("jdbc").options(options).load().show() } }
转载:https://blog.csdn.net/shirukai/article/details/81085642
原文地址:https://www.cnblogs.com/123456www/p/12315473.html
时间: 2024-11-14 08:22:17