用一个MapReduce job实现去重,多目录输出功能

总结之前工作中遇到的一个问题。

背景:

运维用scribe从apache服务器推送过来的日志有重复记录,所以这边的ETL处理要去重,还有个需求是要按业务类型多目录输出,方便挂分区,后面的使用。

这两个需求单独处理都没有问题,但要在一个mapreduce里完成,需要一点技巧。

1、map输入数据,经过一系列处理,输出时:

 if(ttype.equals("other")){
        	file = (result.toString().hashCode() & 0x7FFFFFFF)%400;
        }else if(ttype.equals("client")){
        	file = (result.toString().hashCode() & 0x7FFFFFFF)%260;
        }else{
        	file = (result.toString().hashCode()& 0x7FFFFFFF)%60;
        }
        tp = new TextPair(ttype+"_"+file, result.toString());

        context.write(tp, valuet);

valuet是空的,什么都没有。

我这里有三个类型,other,client,wap,分别代表日志来源平台,要按他们分目录输出。

result就是整条记录。file得到的是最终输出文件名,hash,位操作,取模是为了输出均衡。

map的输出结构<key,value> =(ttype+"_"+file,result.toString())

这样做的目的是:保证相同的记录得到相同的key,同时还要保存类型。partition要按textPair的left,也就是这个key,

保证了后面要写到同一个输出文件的所有记录都到同一个reduce里去,一个reduce可以写多个输出文件,但是一个输出文件不能来自多个reduce,原因很明了。

这样的话大概400+260+60=720个输出文件,每个文件数据量大概差不多,job的reduce数我这里设置的240,这个数连同取模400,260,60都是根据我的数据量来定的,来尽量避免reduce的数据倾斜。

2、reduce方法去重:

 public void reduce(TextPair key, Iterable<Text> values, Context context) throws IOException, InterruptedException
    {

        rcfileCols = getRcfileCols(key.getSecond().toString().split("\001"));
        context.write(key.getFirst(), rcfileCols);

    }

不用迭代,对相同的key组,只输出一次。注意这里job用到的比较器,一定不能是FirstComparator,而是整个textpair对的比较。(先比较left,再比较right)

我的程序里输出文件格式是rcfile。

3、多目录输出:

 job.setOutputFormatClass(WapApacheMutiOutputFormat.class);

public class WapApacheMutiOutputFormat extends RCFileMultipleOutputFormat<Text, BytesRefArrayWritable> {
	Random r = new Random();
	protected String generateFileNameForKeyValue(Text key, BytesRefArrayWritable value,
			Configuration conf) {

		    String typedir = key.toString().split("_")[0];

			return typedir+"/"+key.toString();

	}
}

这里的RCFileMultipleOutputFormat是自己继承自FileOutputFormat 自己写的,主要实现了recordWriter。

最终输出去重的,分目录的数据文件。

理解的关键主要是partition key的设计,reduce的原理。

用一个MapReduce job实现去重,多目录输出功能,布布扣,bubuko.com

时间: 2024-11-03 01:16:33

用一个MapReduce job实现去重,多目录输出功能的相关文章

利用MapReduce实现数据去重

数据去重主要是为了利用并行化的思想对数据进行有意义的筛选. 统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重. 示例文件内容: 此处应有示例文件 设计思路 数据去重的最终目标是让原始数据中出现次数超过一次的数据在输出文件中只出现一次. 自然就想到将同一数据的所有记录都交给一台reduce机器,无路这个数据出现多少次,只要在最终结果中输出一次就可以了. 具体就是reduce的输入应该以数据作为key,而对value-list没有要求. 当reduce收到一个

【源】从零自学Hadoop(08):第一个MapReduce

阅读目录 序 数据准备 wordcount Yarn 新建MapReduce 示例下载 系列索引 本文版权归mephisto和博客园共有,欢迎转载,但须保留此段声明,并给出原文链接,谢谢合作. 文章是哥(mephisto)写的,SourceLink 序 上一篇,我们的Eclipse插件搞定,那开始我们的MapReduce之旅. 在这里,我们先调用官方的wordcount例子,然后再手动创建个例子,这样可以更好的理解Job. 数据准备 一:说明 wordcount这个类是对不同的word进行统计个

hadoop在实现kmeans算法——一个mapreduce实施

写mapreduce程序实现kmeans算法.我们的想法可能是 1. 次迭代后的质心 2. map里.计算每一个质心与样本之间的距离,得到与样本距离最短的质心,以这个质心作为key,样本作为value,输出 3. reduce里,输入的key是质心,value是其它的样本,这时又一次计算聚类中心,将聚类中心put到一个所有变量t中. 4. 在main里比較前一次的质心和本次的质心是否发生变化,假设变化,则继续迭代,否则退出. 本文的思路基本上是依照上面的步骤来做的,仅仅只是有几个问题须要解决 1

HDFS设计思路,HDFS使用,查看集群状态,HDFS,HDFS上传文件,HDFS下载文件,yarn web管理界面信息查看,运行一个mapreduce程序,mapreduce的demo

26 集群使用初步 HDFS的设计思路 l 设计思想 分而治之:将大文件.大批量文件,分布式存放在大量服务器上,以便于采取分而治之的方式对海量数据进行运算分析: l 在大数据系统中作用: 为各类分布式运算框架(如:mapreduce,spark,tez,--)提供数据存储服务 l 重点概念:文件切块,副本存放,元数据 26.1 HDFS使用 1.查看集群状态 命令:   hdfs  dfsadmin –report 可以看出,集群共有3个datanode可用 也可打开web控制台查看HDFS集群

Hadoop学习---第三篇Hadoop的第一个Mapreduce程序

Mapreducer程序写了好几个了,但是之前一直都没有仔细的测试过本地运行和集群上运行的区别,今天写了一个Mapreduce程序,在此记录下来. 本地运行注意事项有以下几点: 1.本地必须配置好Hadoop的开发环境 2.在src里不加入配置文件运行,或者如果本地的src里有mapred-site.xml和yarn-site.xml配置文件,那么mapreduce.framework.name=local以及yarn.resourcemanager.hostname=local 测试说明:sr

未能加载文件或程序集“System.Web.Razor”或它的某一个依赖项。文件或目录损坏且无法读取。

“/”应用程序中的服务器错误. 未能加载文件或程序集“System.Web.Razor”或它的某一个依赖项.文件或目录损坏且无法读取. (异常来自 HRESULT:0x80070570) 说明: 执行当前 Web 请求期间,出现未经处理的异常.请检查堆栈跟踪信息,以了解有关该错误以及代码中导致错误的出处的详细信息. 异常详细信息: System.BadImageFormatException: 未能加载文件或程序集“System.Web.Razor”或它的某一个依赖项.文件或目录损坏且无法读取.

Linux rm(删除一个目录中的一个或多个文件或目录或删除非空目录)

rm命令.rm是常用的命令,该命令的功能为删除一个目录中的一个或多个文件或目录,它也可以将某个目录及其下的所有文件及子目录均删除.对于链接文件,只是删除了链接,原有文件均保持不变. rm是一个危险的命令,使用的时候要特别当心,尤其对于新手,否则整个系统就会毁在这个命令(比如在/(根目录)下执行rm * -rf).所以,我们在执行rm之前最好先确认一下在哪个目录,到底要删除什么东西,操作时保持高度清醒的头脑. 命令格式: rm [选项] 文件- 命令功能: 删除一个目录中的一个或多个文件或目录,如

一个分区挂载到两个目录如何删除另一个

有时候我们会不小心挂载一个分区到两个目录下面,我们可以通过一下方式删除: sudo vi /etc/fstab ,删除/dev/xvdb1 /mnt ext3 defaults 0 0 这行就行 不会出现什么问题的,留着也没事 一个分区挂载到两个目录如何删除另一个,布布扣,bubuko.com

RHadoop教程翻译系列 _Mapreduce(1)_第一个Mapreduce任务

如果单从概念上来说,Mapreduce和R中的函数lapply, tapply并无差别,它们都是把元素转化成列,然后计算索引(Mapreduce中的键),最后合并成一个定义好的组合.首先,让我们看一个简单的lappy的例子. small.ints = 1:1000 sapply(small.ints, function(x) x^2) 这个例子比较简单,只是计算了前1000个整数的平方,不过我们可以从这个例子中对lappy这个函数有个基本的认知,接下来关于这个函数还有更多有意思的例子.现在让我们