原文地址:http://blog.csdn.net/cklsoft/article/details/25568621
1、首先利用http://dongxicheng.org/framework-on-yarn/spark-eclipse-ide/搭建好的Eclipse(Scala)开发平台编写scala文件,内容如下:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object HdfsWordCount { def main(args: Array[String]) { val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"myWordCount",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass)) //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar") val logFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system // val file = sc.textFile("D:\\test.txt") val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) // println(counts) counts.saveAsTextFile(args(2)/*"hdfs://master:9101/user/root/out"*/) } }
2、利用Eclipse的Export Jar File功能将Scala源文件编译成class文件并打包成sc.jar
3、执行run_wc.java脚本:
#! /bin/bash SPARK_HADOOP_VERSION=2.2.0 SPARK_YARN=true export SPARK_JAR=$SPARK_HOME/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar export EXEC_JAR=$SPARK_HOME/sc.jar #examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar ./bin/spark-class org.apache.spark.deploy.yarn.Client --jar $EXEC_JAR --class HdfsWordCount --args yarn-standalone --args hdfs://master:9101/user/root/spam.data --args hdfs://master:9101/user/root/out2 --num-workers 1 --master-memory 512m --worker-memory 512m --worker-cores 1
附:
TopK(选出出现频率最高的前k个)代码:
package sc import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object TopK { def main(args: Array[String]) { //yarn-standalone hdfs://master:9101/user/root/spam.data 5 val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"myWordCount",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass)) //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar") val logFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) val sorted=counts.map{ case(key,val0) => (val0,key) }.sortByKey(true,1) val topK=sorted.top(args(2).toInt) topK.foreach(println) } }
附录2 join操作(题意详见:http://dongxicheng.org/framework-on-yarn/spark-scala-writing-application/):
package sc import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ object SparkJoinTest { def main(args: Array[String]) { val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"SparkJoinTest",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass)) //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar") val txtFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system val rating=txtFile.map(line =>{ val fileds=line.split("::") (fileds(1).toInt,fileds(2).toDouble) } )//大括号内以最后一个表达式为值 val movieScores=rating.groupByKey().map( data=>{ val avg=data._2.sum/data._2.size // if (avg>4.0) (data._1,avg) } ) val moviesFile=sc.textFile(args(2)) val moviesKey=moviesFile.map(line =>{ val fileds=line.split("::") (fileds(0).toInt,fileds(1)) } ).keyBy(tuple=>tuple._1)//设置健 val res=movieScores.keyBy(tuple=>tuple._1).join(moviesKey)// (<k,v>,<k,w>=><k,<v,w>>) .filter(f=>f._2._1._2>4.0) .map(f=>(f._1,f._2._1._2,f._2._2._2)) res.saveAsTextFile(args(3)) } }
Spark on YARN--WordCount、TopK
时间: 2024-10-23 21:08:25