实验 4 RDD 编程初级实践

注意:spark的编码格式是utf-8,其他的格式会有乱码,所以文件要使用utf-8编码

pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>gao</groupId>
    <artifactId>WordCount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <version>2.15.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.19</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>

        </plugins>
    </build>

</project>

(1)该系总共有多少学生; 

(2)该系共开设来多少门课程;

(3)Tom 同学的总成绩平均分是多少;

(4)求每名同学的选修的课程门数;

(5)该系 DataBase 课程共有多少人选修;

(6)各门课程的平均分是多少;

(7)使用累加器计算共有多少人选了 DataBase 这门课。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object one {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setMaster("local")
      .setAppName("text1")
    val sc = new SparkContext(conf)
    val rdd = sc.textFile("C:\\Users\\Administrator\\Desktop\\Data01.txt")
    //该系总共有多少学生;
    val par=rdd.map( row=>row.split(",")(0))
    var count=par.distinct()
    println("学生总人数:"+count.count())
    //该系共开设来多少门课程;
    val couse=rdd.map( row=>row.split(",")(1))
    println("课程数:"+couse.distinct().count())
   //Tom 同学的总成绩平均分是多少;
    val pare = rdd.filter(row=>row.split(",")(0)=="Tom")
    /*pare.foreach(println)*/
    pare.map(row=>(row.split(",")(0),row.split(",")(2).toInt))
      .mapValues(x=>(x,1))
      .reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))
      .mapValues(x => (x._1 / x._2))
      .collect().foreach(x=>println("Tom的平均成绩:"+x._2))
    //求每名同学的选修的课程门数;
    val pare2 = rdd.map(row=>(row.split(",")(0),row.split(",")(1)))
    pare2.mapValues(x => (x,1)).reduceByKey((x,y) => (" ",x._2 + y._2)).mapValues(x => x._2).foreach(println)
   //该系 DataBase 课程共有多少人选修;
    val pare3 = rdd.filter(row=>row.split(",")(1)=="DataBase")
    println("DataBase的选修人数:"+pare3.count)
    // 各门课程的平均分是多少;
    val pare4 = rdd.map(row=>(row.split(",")(1),row.split(",")(2).toInt))
    pare4.mapValues(x=>(x,1))
      .reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))
      .mapValues(x => (x._1/ x._2))
      .collect().foreach(println)
    //使用累加器计算共有多少人选了 DataBase 这门课。
    val pare5 = rdd.filter(row=>row.split(",")(1)=="DataBase")
      .map(row=>(row.split(",")(1),1))
    val accum = sc.longAccumulator("My Accumulator")
    pare5.values.foreach(x => accum.add(x))
    println("选了 DataBase 这门课的人数:"+accum.value)
  }
}

2.对于两个输入文件 A 和 B,编写 Spark 独立应用程序,对两个文件进行合并,并剔除其 中重复的内容,得到一个新文件 C

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner

object two
{
  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setMaster("local")
      .setAppName("text2")
    val sc = new SparkContext(conf)
    val dataFile = "C:\\Users\\Administrator\\Desktop\\data"
    val data = sc.textFile(dataFile,2)
    val res = data.filter(_.trim().length>0).map(line=>(line.trim,"\t"))
      .partitionBy(new HashPartitioner(1)).groupByKey().sortByKey().keys
    res.saveAsTextFile("result")
  }
}

3.每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生 名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到 一个新文件中。

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.HashPartitioner

object three {
  def main(args: Array[String]) {
    val conf = new SparkConf()
    conf.setMaster("local")
      .setAppName("text3")
    val sc = new SparkContext(conf)
    val dataFile = "C:\\Users\\Administrator\\Desktop\\data1"
    val data = sc.textFile(dataFile,3)
    val res = data.filter(_.trim().length>0)
      .map(line=>(line.split("\t")(0).trim()
        ,line.split("\t")(1).trim().toInt))
      .partitionBy(new HashPartitioner(1))
      .groupByKey().map(x => {
      var n = 0
      var sum = 0.0
      for(i <- x._2){
        sum = sum + i
        n = n +1
      }
      val avg = sum/n
      val format = f"$avg%1.2f".toDouble
      (x._1,format)
    })
    res.saveAsTextFile("result1")
  }
}

原文地址:https://www.cnblogs.com/miria-486/p/10519630.html

时间: 2024-10-07 14:12:55

实验 4 RDD 编程初级实践的相关文章

实验4 RDD编程初级实践

1.spark-shell交互式编程 (1) 该系总共有多少学生 scala> val lines = sc.textFile("file:///usr/local/spark/sparklab/Data01.txt") lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/sparklab/Data01.txt MapPartitionsRDD[4] at textFile at <consol

第五周周二练习:实验 5 Spark SQL 编程初级实践

1.题目: 源码: import java.util.Properties import org.apache.spark.sql.types._ import org.apache.spark.sql.Row import org.apache.spark.sql.SparkSession import org.apache.spark.sql.DataFrameReader object TestMySQL { def main(args: Array[String]) { val spar

实验 5 Spark SQL 编程初级实践

Spark SQL基本操作 (1) 查询所有数据: (2) 查询所有数据,并去除重复的数据: (3) 查询所有数据,打印时去除id字段: (4) 筛选出age>30的记录: (5) 将数据按age分组: (6) 将数据按name升序排列: (7) 取出前3行数据: (8) 查询所有记录的name列,并为其取别名为username: (9) 查询年龄age的平均值: (10) 查询年龄age的最小值. 原文地址:https://www.cnblogs.com/flw0322/p/12288397.

SPark SQL编程初级实践

今下午在课上没有将实验做完,课下进行了补充,最终完成.下面附上厦门大学数据库实验室中spark实验官网提供的标准答案,以供参考. 三.实验内容和要求 1.Spark SQL 基本操作 将下列 json 数据复制到你的 ubuntu 系统/usr/local/spark 下,并保存命名为 employee.json. { "id":1 ,"name":" Ella","age":36 } { "id":2,&

Spark SQL 编程初级实践

1.Spark SQL 基本操作 将下列 JSON 格式数据复制到 Linux 系统中,并保存命名为 employee.json. { "id":1 , "name":" Ella" , "age":36 } { "id":2, "name":"Bob","age":29 } { "id":3 , "name"

2020寒假学习01 Scala 编程初级实践

1. 计算级数请用脚本的方式编程计算并输出下列级数的前 n 项之和 Sn,直到 Sn 刚好大于或等于 q为止,其中 q 为大于 0 的整数,其值通过键盘输入. Sn = 2/1+3/2+4/3+......+n+1/n 例如,若 q 的值为 50.0,则输出应为:Sn=50.416695.请将源文件保存为exercise2-1.scala,在REPL模式下测试运行,测试样例:q=1时,Sn=2:q=30时,Sn=30.891459:q=50 时,Sn=50.416695. object test

SIX Spark Streaming 编程初级实践

Flume 官网下载 Flume1.7.0 安装文件,下载地址如下: http://www.apache.org/dyn/closer.lua/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz 下载后,把 Flume1.7.0 安装到 Linux 系统的“/usr/local/flume”目录下, ⑴解压安装包 1.cd ~/下载 2.sudo tar -zxvf apache-flume-1.7.0-bin.tar.gz -C /usr/local # 将 a

20155208徐子涵 实验五 网络编程与安全

20155208徐子涵 实验五 网络编程与安全 实验一 两人一组结对编程: 参考http://www.cnblogs.com/rocedu/p/6766748.html#SECDSA 结对实现中缀表达式转后缀表达式的功能 MyBC.java 结对实现从上面功能中获取的表达式中实现后缀表达式求值的功能,调用MyDC.java 上传测试代码运行结果截图和码云链接 产品代码 1 import java.util.StringTokenizer; 2 import java.util.Stack; 3

20175234 2018-2019-2 实验五 网络编程与安全

目录 20175234 2018-2019-2 实验五 网络编程与安全 任务一 任务二 任务三 任务四 任务五 码云链接 参考资料 20175234 2018-2019-2 实验五 网络编程与安全 任务一 题目 参考http://www.cnblogs.com/rocedu/p/6766748.html#SECDSA 结对实现中缀表达式转后缀表达式的功能 MyBC.java 结对实现从上面功能中获取的表达式中实现后缀表达式求值的功能,调用MyDC.java 实验内容: 1.熟悉栈的应用 Stac