Scala实现Mapreduce程序2-----Top5

输入n个数,返回TOP5的数字

scala实现,以各个数字为key,""为空,按照key进行排序,取出前5个

object Top5 {  def main(args: Array[String]): Unit = {    val conf = new SparkConf().setMaster("")    val sc = new SparkContext(conf)    val one = sc.textFile("/spark/test")    var index=0    val text=one.filter(x=>(x.trim.length>0)&&(x.split(",").length==4)).map(_.split(",")(2).toInt).      map(x=>(x,"")).sortByKey(false).map(x=>x._1).take(5).foreach(x=>{      index=index+1      println("top index:"+index+"\t"+x)    })

}}

Mapreduce实现,(key,"") =>(index+"",key)

MapReduce中的IntWritable默认是按照降序排列的,要实现升序排序,自己实现MyIntWritabel
public class MyIntWritable implements WritableComparable<MyIntWritable> {    private  Integer num;

public MyIntWritable(Integer num){        this.num=num;    }

public MyIntWritable(){}

public void write(DataOutput output) throws IOException {        output.writeInt(num);    }

public void  readFields(DataInput input) throws IOException {        this.num=input.readInt();    }

public int compareTo(MyIntWritable o){        int minux=this.num-o.num;        return minux*(-1);    }

@Override    public int hashCode() {        return this.num.hashCode();    }

public String toSting(){        return this.num+"";    }

public boolean equals(Object obj) {        if (obj instanceof MyIntWritable) {            return false;        }        MyIntWritable ok2 = (MyIntWritable) obj;        return (this.num == ok2.num);    }}
package HadoopvsSpark;

import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/** * Created by Administrator on 2017/5/26. */public class TopN {    public static class TopNMapper extends Mapper<LongWritable,Text,MyIntWritable,Text>{        public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException {            String line=value.toString();            if(line.trim().length()>0){                String str[]=line.split( "," );                if(str.length==4){                    context.write( new MyIntWritable( Integer.parseInt( str[2] ) ),new Text( "" ) );                }            }        }    }

public static class TopNReducer extends Reducer<MyIntWritable,Text,Text,MyIntWritable>{        private int index=0;        public void reduce(MyIntWritable key,Iterable<Text> values,Context context) throws IOException, InterruptedException {              index++;              if(index<=5){                  context.write( new Text( index+" " ),key );              }        }    }

public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

org.apache.hadoop.conf.Configuration conf=new org.apache.hadoop.conf.Configuration();        Job job=new Job(conf,"topn");        job.setJarByClass( TopN.class );

job.setMapperClass( TopNMapper.class );        job.setMapOutputKeyClass( MyIntWritable.class );        job.setMapOutputValueClass( Text.class );

job.setReducerClass( TopNReducer.class );        job.setOutputKeyClass( Text.class);        job.setOutputValueClass( MyIntWritable.class );

FileInputFormat.addInputPath( job,new Path( args[0] ) );        Path outputdir=new Path( args[1] );        FileSystem fs=FileSystem.get( conf ); //判断输出目录是否存在        if(fs.exists( outputdir )){            fs.delete( outputdir,true );        }        FileOutputFormat.setOutputPath( job,outputdir ) ;        System.out.println(job.waitForCompletion( true )?1:0);    }}
时间: 2024-12-20 11:44:26

Scala实现Mapreduce程序2-----Top5的相关文章

Scala实现Mapreduce程序4-----数据去重

数据去重,key只输出一次 scala实现:先groupByKey(),然后SortByKey(),然后输出keys object Reduplicate { def main(args: Array[String]): Unit = { val conf=new SparkConf().setMaster("local").setAppName("remove duplication"); val sc=new SparkContext(conf); val li

Scala实现Mapreduce程序3----数据排序

输入n个数字,输出每一个数字以及其排名例如: 4 2 3 1 输出: 1 1 2 2 3 3 4 4 scala实现 package HadoopvsSpark.ScalaMap import org.apache.spark.{HashPartitioner, SparkConf, SparkContext} /** * Created by Administrator on 2017/5/25. */object Sorted { def main(args: Array[String]):

Hadoop之MapReduce程序应用一

摘要:MapReduce程序处理专利数据集. 关键词:MapReduce程序   专利数据集 数据源:专利引用数据集cite75_99.txt.(该数据集可以从网址http://www.nber.org/patents/下载) 问题描述: 读取专利引用数据集并对它进行倒排.对于每一个专利,找到那些引用它的专利并进行合并.top5输出结果如下: 1                                3964859, 4647229 10000                      

使用Eclipse运行Hadoop 2.x MapReduce程序常见问题

1. 当我们编写好MapReduce程序,点击Run on Hadoop的时候,Eclipse控制台输出如下内容: 这个信息告诉我们没有找到log4j.properties文件.如果没有这个文件,程序运行出错的时候,就没有打印日志,因此我们会很难调试. 解决方法:复制$HADOOP_HOME/etc/hadoop/目录下的log4j.properties文件到MapReduce项目 src文件夹下. 2.当执行MapReduce程序的时候,Eclipse可能会报告堆益处的错误. 此时,MapRe

Win7下Eclipse中运行远程MapReduce程序

1.hadoop插件的参数配置 2.运行时的参数 3.运行结果 Win7下Eclipse中运行远程MapReduce程序,布布扣,bubuko.com

Mapreduce程序运行的多模式

Mapreduce程序可在多种模式下运行: 本地模式: 1)         本地文件,本地处理:将MR的输入输出路径设置为本地路径: 2)         集群文件,本地处理:将MR的输入输出设置为HDFS的路径,job在本地进行处理; 2.集群模式:集群文件,集群处理:将MR的输入输出设置为HDFS的路径,并将Job提交到集群里面(Yarn)处理:其中以集群模式运行的时候还可通过以下几种方式对Job作业进行提交(前提是在集群里面已经启动HDFS以及Yarn): 1)         在Ecl

在Hadoop上运行基于RMM中文分词算法的MapReduce程序

原文:http://xiaoxia.org/2011/12/18/map-reduce-program-of-rmm-word-count-on-hadoop/ 在Hadoop上运行基于RMM中文分词算法的MapReduce程序 23条回复 我知道这个文章标题很“学术”化,很俗,让人看起来是一篇很牛B或者很装逼的论文!其实不然,只是一份普通的实验报告,同时本文也不对RMM中文分词算法进行研究.这个实验报告是我做高性能计算课程的实验里提交的.所以,下面的内容是从我的实验报告里摘录出来的,当作是我学

mapreduce程序编写(WordCount)

折腾了半天.终于编写成功了第一个自己的mapreduce程序,并通过打jar包的方式运行起来了. 运行环境: windows 64bit eclipse 64bit jdk6.0 64bit 一.工程准备 1.新建java project 2.导入jar包 新建一个user library 把hadoop文件夹里的hadoop-core和lib包里的所有包都导入进来,以免出错. 二.编码 1.主要是计算单词的小程序,测试用 package com.hirra; import java.io.IO

Hadoop学习---第三篇Hadoop的第一个Mapreduce程序

Mapreducer程序写了好几个了,但是之前一直都没有仔细的测试过本地运行和集群上运行的区别,今天写了一个Mapreduce程序,在此记录下来. 本地运行注意事项有以下几点: 1.本地必须配置好Hadoop的开发环境 2.在src里不加入配置文件运行,或者如果本地的src里有mapred-site.xml和yarn-site.xml配置文件,那么mapreduce.framework.name=local以及yarn.resourcemanager.hostname=local 测试说明:sr