第五周周二练习:实验 5 Spark SQL 编程初级实践

1.题目:

源码:

import java.util.Properties
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrameReader
object TestMySQL {
    def main(args: Array[String]) {
     val spark = SparkSession.builder().appName("RddToDFrame").master("local").getOrCreate()
   import spark.implicits._
        val employeeRDD = spark.sparkContext.parallelize(Array("3 Mary F 26","4 Tom M 23")).map(_.split(" "))
        val  schema  =  StructType(List(StructField("id",  IntegerType,true),StructField("name",  StringType,  true),StructField("gender",  StringType,true),StructField("age", IntegerType, true)))
        val  rowRDD  =  employeeRDD.map(p  =>  Row(p(0).toInt,p(1).trim,p(2).trim,p(3).toInt))
        val employeeDF = spark.createDataFrame(rowRDD, schema)
        val prop = new Properties()
        prop.put("user", "root")
        prop.put("password", "hadoop")
        prop.put("driver","com.mysql.jdbc.Driver")
        employeeDF.write.mode("append").jdbc("jdbc:mysql://localhost:3306/sparktest","sparktest.employee", prop)
        val jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user","root").option("password", "hadoop").load()
        jdbcDF.agg("age" -> "max", "age" -> "sum").show()
        print("ok")
    }
}

数据库数据:

结果:

2.编程实现将 RDD  转换为 DataFrame

官网给出两种方法,这里给出一种(使用编程接口,构造一个 schema 并将其应用在已知的 RDD 上。):

源码:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
object RDDtoDF {
def main(args: Array[String]) {
   val spark = SparkSession.builder().appName("RddToDFrame").master("local").getOrCreate()
   import spark.implicits._
val  employeeRDD  =spark.sparkContext.textFile("file:///usr/local/spark/employee.txt")
val schemaString = "id name age"
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName,
StringType, nullable = true))
val schema = StructType(fields)
val  rowRDD  =  employeeRDD.map(_.split(",")).map(attributes  =>
Row(attributes(0).trim, attributes(1), attributes(2).trim))
val employeeDF = spark.createDataFrame(rowRDD, schema)
employeeDF.createOrReplaceTempView("employee")
val results = spark.sql("SELECT id,name,age FROM employee")
results.map(t => "id:"+t(0)+","+"name:"+t(1)+","+"age:"+t(2)).show()
}
}

结果:

原文地址:https://www.cnblogs.com/mm20/p/10603428.html

时间: 2024-07-28 14:04:35

第五周周二练习:实验 5 Spark SQL 编程初级实践的相关文章

实验 5 Spark SQL 编程初级实践

Spark SQL基本操作 (1) 查询所有数据: (2) 查询所有数据,并去除重复的数据: (3) 查询所有数据,打印时去除id字段: (4) 筛选出age>30的记录: (5) 将数据按age分组: (6) 将数据按name升序排列: (7) 取出前3行数据: (8) 查询所有记录的name列,并为其取别名为username: (9) 查询年龄age的平均值: (10) 查询年龄age的最小值. 原文地址:https://www.cnblogs.com/flw0322/p/12288397.

SPark SQL编程初级实践

今下午在课上没有将实验做完,课下进行了补充,最终完成.下面附上厦门大学数据库实验室中spark实验官网提供的标准答案,以供参考. 三.实验内容和要求 1.Spark SQL 基本操作 将下列 json 数据复制到你的 ubuntu 系统/usr/local/spark 下,并保存命名为 employee.json. { "id":1 ,"name":" Ella","age":36 } { "id":2,&

Spark SQL 编程初级实践

1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json. { "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name"

SIX Spark Streaming 编程初级实践

Flume 官网下载 Flume1.7.0 安装文件,下载地址如下: http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz 下载后,把 Flume1.7.0 安装到 Linux 系统的“/usr/local/flume”目录下, ⑴解压安装包 1.cd ~/下载 2.sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local # 将 a

第五周课程总结&实验报告

第五周课程总结&实验报告 实验三 String类的应用 实验目的 掌握类String类的使用: 学会使用JDK帮助文档: 实验内容 1.已知字符串:"this is a test of java".按要求执行以下操作:(要求源代码.结果截图.) 统计该字符串中字母s出现的次数. 统计该字符串中子串"is"出现的次数. 统计该字符串中单词"is"出现的次数. 实现该字符串的倒序输出. 2.请编写一个程序,使用下述算法加密或解密用户输入的英文

第五周总结及实验三

实验三 String类的应用 实验目的 掌握类String类的使用: 学会使用JDK帮助文档: 实验内容 1.已知字符串:"this is a test of java".按要求执行以下操作:(要求源代码.结果截图.) 统计该字符串中字母s出现的次数. 实验代码: package String; public class Test { public static void main(String[] args) { String s = "this is a test of

Spark SQL 编程

Spark SQL的依赖 Spark SQL的入口:SQLContext 官方网站参考 https://spark.apache.org/docs/1.6.2/sql-programming-guide.html#starting-point-sqlcontext 针对几种不同的语言来写. Spark SQL的入口:HiveContext SQLContext vs HiveContext Spark SQL的作用与使用方式 Spark SQL支持的API 从程序中使用SparkSQL的基本套路

Spark SQL编程指南(Python)

前言 Spark SQL允许我们在Spark环境中使用SQL或者Hive SQL执行关系型查询.它的核心是一个特殊类型的Spark RDD:SchemaRDD. SchemaRDD类似于传统关系型数据库的一张表,由两部分组成: Rows:数据行对象 Schema:数据行模式:列名.列数据类型.列可否为空等 Schema可以通过四种方式被创建: (1)Existing RDD (2)Parquet File (3)JSON Dataset (4)By running Hive SQL 考虑到Par

Spark SQL编程指南(Python)【转】

转自:http://www.cnblogs.com/yurunmiao/p/4685310.html 前言 Spark SQL允许我们在Spark环境中使用SQL或者Hive SQL执行关系型查询.它的核心是一个特殊类型的Spark RDD:SchemaRDD. SchemaRDD类似于传统关系型数据库的一张表,由两部分组成: Rows:数据行对象 Schema:数据行模式:列名.列数据类型.列可否为空等 Schema可以通过四种方式被创建: (1)Existing RDD (2)Parquet