Spark on YARN--WordCount、TopK

原文地址:http://blog.csdn.net/cklsoft/article/details/25568621

1、首先利用http://dongxicheng.org/framework-on-yarn/spark-eclipse-ide/搭建好的Eclipse(Scala)开发平台编写scala文件,内容如下:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object HdfsWordCount {
  def main(args: Array[String]) {
    val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"myWordCount",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
                                                        //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar")
    val logFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system
  //  val file = sc.textFile("D:\\test.txt")
    val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
 //   println(counts)
    counts.saveAsTextFile(args(2)/*"hdfs://master:9101/user/root/out"*/)
  }
}

2、利用Eclipse的Export Jar File功能将Scala源文件编译成class文件并打包成sc.jar

3、执行run_wc.java脚本:

#! /bin/bash
SPARK_HADOOP_VERSION=2.2.0
SPARK_YARN=true
export SPARK_JAR=$SPARK_HOME/assembly/target/scala-2.10/spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar
export EXEC_JAR=$SPARK_HOME/sc.jar
#examples/target/scala-2.10/spark-examples_2.10-assembly-0.9.0-incubating.jar 

./bin/spark-class org.apache.spark.deploy.yarn.Client  --jar $EXEC_JAR  --class HdfsWordCount  --args  yarn-standalone  --args hdfs://master:9101/user/root/spam.data  --args hdfs://master:9101/user/root/out2  --num-workers 1  --master-memory 512m  --worker-memory 512m  --worker-cores 1

附:

TopK(选出出现频率最高的前k个)代码:

package sc
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object TopK {
  def main(args: Array[String]) {
    //yarn-standalone hdfs://master:9101/user/root/spam.data 5
    val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"myWordCount",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
                                                        //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar")
    val logFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system
    val counts = logFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
    val sorted=counts.map{
      case(key,val0) => (val0,key)
    }.sortByKey(true,1)
    val topK=sorted.top(args(2).toInt)
    topK.foreach(println)
  }
}

附录2 join操作(题意详见:http://dongxicheng.org/framework-on-yarn/spark-scala-writing-application/):

package sc
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
object SparkJoinTest {
  def main(args: Array[String]) {
    val sc = new SparkContext(args(0)/*"yarn-standalone"*/,"SparkJoinTest",System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
                                                        //List("lib/spark-assembly_2.10-0.9.0-incubating-hadoop1.0.4.jar")
    val txtFile = sc.textFile(args(1))//"hdfs://master:9101/user/root/spam.data") // Should be some file on your system
    val rating=txtFile.map(line =>{
    	val fileds=line.split("::")
    	(fileds(1).toInt,fileds(2).toDouble)
    	}
    )//大括号内以最后一个表达式为值
    val movieScores=rating.groupByKey().map(
        data=>{
          val avg=data._2.sum/data._2.size
       //   if (avg>4.0)
            (data._1,avg)
        }
    )

    val moviesFile=sc.textFile(args(2))
    val moviesKey=moviesFile.map(line =>{
      val fileds=line.split("::")
      (fileds(0).toInt,fileds(1))
      }
    ).keyBy(tuple=>tuple._1)//设置健

    val res=movieScores.keyBy(tuple=>tuple._1).join(moviesKey)// (<k,v>,<k,w>=><k,<v,w>>)
    .filter(f=>f._2._1._2>4.0)
    .map(f=>(f._1,f._2._1._2,f._2._2._2))
    res.saveAsTextFile(args(3))
  }
}

Spark on YARN--WordCount、TopK

时间: 2024-10-23 21:08:25

Spark on YARN--WordCount、TopK的相关文章

运维系列:05、Spark on Yarn

Spark 0.6.0开始支持此功能 准备: 运行Spark-on-YARN需要Spark的二进制发布包.参考编译 配置: 环境变量: SPARK_YARN_USER_ENV 用户可以在这个参数中设置Spark on YARN的环境变量,可以省略. 例如:SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar". // TODO 具体可配置项 SPARK_JAR 设置Spark jar在HDFS的位置. 例如:export SPARK_JAR=hdf

Spark(十二) -- Spark On Yarn &amp; Spark as a Service &amp; Spark On Tachyon

Spark On Yarn: 从0.6.0版本其,就可以在在Yarn上运行Spark 通过Yarn进行统一的资源管理和调度 进而可以实现不止Spark,多种处理框架并存工作的场景 部署Spark On Yarn的方式其实和Standalone是差不多的,区别就是需要在spark-env.sh中添加一些yarn的环境配置,在提交作业的时候会根据这些配置加载yarn的信息,然后将作业提交到yarn上进行管理 首先请确保已经部署了Yarn,相关操作请参考: hadoop2.2.0集群安装和配置 部署完

提交任务到spark(以wordcount为例)

1.首先需要搭建好hadoop+spark环境,并保证服务正常.本文以wordcount为例. 2.创建源文件,即输入源.hello.txt文件,内容如下: tom jerry henry jim suse lusy 注:以空格为分隔符 3.然后执行如下命令: hadoop fs -mkdir -p /Hadoop/Input(在HDFS创建目录) hadoop fs -put hello.txt /Hadoop/Input(将hello.txt文件上传到HDFS) hadoop fs -ls

spark on yarn详解

1.参考文档: spark-1.3.0:http://spark.apache.org/docs/1.3.0/running-on-yarn.html spark-1.6.0:http://spark.apache.org/docs/1.6.0/running-on-yarn.html 备注:从spark-1.6.0开始,spark on yarn命令有略微改变,具体参考官方文档,这里以spark 1.3.0集群为主. 2.前期准备 编译spark,参看文档:http://www.cnblogs

Spark on Yarn年度知识整理

大数据体系结构: Spark简介 Spark是整个BDAS的核心组件,是一个大数据分布式编程框架,不仅实现了MapReduce的算子map 函数和reduce函数及计算模型,还提供更为丰富的算子,如filter.join.groupByKey等.是一个用来实现快速而同用的集群计算的平台. Spark将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度.RPC.序列化和压缩,并为运行在其上的上层组件提供API.其底层采用Scala这种函数式语言书写而成,并且所提供的API深度借鉴Sca

Spark On YARN内存和CPU分配

本篇博客参考:http://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/ 软件版本: CDH:5.7.2,JDK:1.7: 问题描述: 在使用Spark On YARN时(无论是Client模式或者是Cluster模式,当然下面会有这种模式的对比区别),可以添加诸如: --executor-memory 8G --executor-cores 5 --num-executors 20 等等这样的

Spark on Yarn彻底解密(DT大数据梦工厂)

内容: 1.Hadoop Yarn的工作流程解密: 2.Spark on Yarn两种运行模式实战: 3.Spark on Yarn工作流程解密: 4.Spark on Yarn工作内幕解密: 5.Spark on Yarn最佳实践: 资源管理框架Yarn Mesos是分布式集群的资源管理框架,和大数据没关系,但是可以管理大数据的资源 ==========Hadoop Yarn解析============ 1.Yarn是Hadoop推出的资源管理器,是负责分布式(大数据)集群计算的资源管理的,负

spark 在yarn执行job时一直抱0.0.0.0:8030错误

近日新写完的spark任务放到yarn上面执行时,在yarn的slave节点中一直看到报错日志:连接不到0.0.0.0:8030 . 1 The logs are as below: 2 2014-08-11 20:10:59,795 INFO [main] org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8030 3 2014-08-11 20:11:01,838 INFO [ma

Oozie Spark on YARN requirement failed

软件环境: CDH:5.7.3:Oozie:4.1.0-CDH5.7.3 : Spark:1.6.0-cdh5.7.3-hadoop2.6.0-cdh5.7.3 : Hadoop:hadoop2.6.0-cdh5.7.3(HDFS 采用HA方式): 问题描述: 在使用CDH5.7.3版本的时候,发起一个Oozie工作流,该工作流使用Spark On YARN的方式提交一个Spark程序,但是在Oozie中该程序运行失败,同时找到YARN监控中对应的任务,发现出现下面的错误(该Spark任务如果使