使用spark shell进行交互式分析
上传一个文件到hdfs上的如下目录/user/hdfs/
-bash-4.1$ hadoop fs -put README.md /user/hdfs/ -bash-4.1$ hadoop fs -ls /user/hdfs Found 3 items drwxr-xr-x - hdfs supergroup 0 2016-08-21 15:34 /user/hdfs/.sparkStaging drwx------ - hdfs supergroup 0 2016-08-20 13:12 /user/hdfs/.staging -rw-r--r-- 3 hdfs supergroup 3233 2016-08-21 15:36 /user/hdfs/README.md
然后进入spark-shell命令行交互界面
[[email protected] ~]# su - hdfs -bash-4.1$ spark-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.5.0-cdh5.5.4 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_79) Type in expressions to have them evaluated. Type :help for more information. 16/08/21 15:34:25 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. Spark context available as sc (master = yarn-client, app id = application_1471668978254_0006). SQL context available as sqlContext.
计算文件行数
scala> val textFile = sc.textFile("README.md") textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21 scala> textFile.count() res1: Long = 99
输出文件第一行内容
scala> textFile.first() res2: String = # Highlight.js
计算包含某字符的行数
scala> textFile.filter(line => line.contains("##")).count res3: Long = 5
接下来我们造一个文件来做词频统计用
文件内容如下:
-bash-4.1$ cat test.txt 张三 张四 张三 张五 李三 李三 李四 李四 李四 王二 老王 老王
上传文件
-bash-4.1$ hadoop fs -put test.txt /user/hdfs/
做词频统计
scala> val wc = sc.textFile("test.txt") wc: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[6] at textFile at <console>:21
进行map reduce
scala> val result = wc.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b) result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:23
scala> result.collect() res5: Array[(String, Int)] = Array((张五,1), (老王,2), (张三,2), (张四,1), (王二,1), (李四,3), (李三,2))
时间: 2024-10-13 11:58:33