万法归宗之Hadoop编程无界限

记录下,散仙今天的工作以及遇到的问题和解决方案,俗话说,好记性不如烂笔头,写出来文章,供大家参考,学习和点评,进步,才是王道 ,废话不多说,下面切入主题:

先介绍下需求:

散仙要处理多个类似表的txt数据,当然只有值,列名什么的全部在xml里配置了,然后加工这些每个表的每一行数据,生成特定的格式基于ASCII码1和ASCII码2作为分隔符的一行数据,ASCII2作为字段名和字段值的分隔符,ASCII1作为字段和字段之间的分隔符,每解析一个txt文件时,都要获取文件名,然后与xml中的schema信息映射并找到对应位置的值,它的列名,前提是,这些的txt的内容位置,是固定的,然后我们知道它每一行属于哪个表结构的映射,因为这些映射信息是提前配置在xml中的,如下图:

当然类似这样的结构有20个左右的表文件,到时候,我们的数据方,会给我们提供这些txt文件,然后散仙需要加工成特定的格式,然后写入HDFS,由我们的索引系统使用MapReduce批量建索引使用。

本来想直接用java写个单机程序,串行处理,然后写入HDFS,后来一想假如数据量比较大,串行程序还得改成多线程并行执行,这样改来改去,倒不如直接使用MapReduce来的方便

ok,说干就干,测试环境已经有一套CDH5.3的hadoop2.5集群,直接就在eclipse进行开发和MapReduce程序的调试,反正也好久也没手写MapReduce了,前段时间,一直在用Apache Pig分析数据,这次处理的逻辑也不复杂,就再写下练练手 , CDH的集群在远程的服务器上,散仙本机的hadoop是Apache Hadoop2.2的版本,在使用eclipse进行开发时,也没来得及换版本,理论上最好各个版本,不同发行版,之间对应起来开发比较好,这样一般不会存在兼容性问题,但散仙这次就懒的换了,因为CDH5.x之后的版本,是基于社区版的Apache Hadoop2.2之上改造的,接口应该大部分都一致,当然这只是散仙猜想的。

(1)首先,散仙要搞定的事,就是解析xml了,在程序启动之前需要把xml解析,加载到一个Map中,这样在处理每种txt时,会根据文件名来去Map中找到对应的schma信息,解析xml,散仙直接使用的jsoup,具体为啥,请点击散仙这篇 
http://qindongliang.iteye.com/blog/2162519文章,在这期间遇到了一个比较蛋疼的问题,简直是一个bug,最早散仙定义的xml是每个表,一个table标签,然后它下面有各个property的映射定义,但是在用jsoup的cssQuery语法解析时,发现总是解析不出来东西,按照以前的做法,是没任何问题的,这次简直是开玩笑了,后来就是各种搜索,测试,最后才发现,将table标签,换成其他的任何标签都无任何问题,具体原因,散仙还没来得及细看jsoup的源码,猜测table标签应该是一个关键词什么的标签,在解析时会和html的table冲突,所以在xml解析中失效了,花了接近2个小时,求证,检验,终于搞定了这个小bug。

(2)搞定了这个问题,散仙就开始开发调试MapReduce版的处理程序,这下面临的又一个问题,就是如何使用Jsoup解析存放在HDFS上的xml文件,有过Hadoop编程经验的人,应该都知道,HDFS是一套分布式的文件系统,与我们本地的磁盘的存储方式是不一样的,比如你在正常的JAVA程序上解析在C:\file\t.tx或者在linux上/home/user/t.txt,所编写的程序在Hadoop上是无法使用的,你得使用Hadoop提供的编程接口获取它的文件信息,然后转成字符串之后,再给jsoup解析。

(3)ok,第二个问题搞定之后,你得编写你的MR程序,处理对应的txt文本,而且保证不同的txt里面的数据格式,所获取的scheaml是正确的,所以在map方法里,你要获取当然处理文件的路径,然后做相应判断,在做对应处理。

(4)很好,第三个问题搞定之后,你的MR的程序,基本编写的差不多了,下一步就改考虑如何提交到Hadoop的集群上,来调试程序了,由于散仙是在Win上的eclipse开发的,所以这一步可能遇到的问题会很多,而且加上,hadoop的版本不一致与发行商也不一致,出问题也纯属正常。

这里多写一点,一般建议大家不要在win上调试hadoop程序,这里的坑非常多,如果可以,还是建议大家在linux上直接玩,下面说下,散仙今天又踩的坑,关于在windows上调试eclipse开发, 运行Yarn的MR程序,散仙以前也记录了文章,感兴趣者,可以点击这个链接 
http://qindongliang.iteye.com/blog/2078452地址

(5)提交前,是需要使用ant或maven或者java自带的导出工具,将项目打成一个jar包提交的,这一点大家需要注意下,最后测试得出,Apache的hadoop2.2编写的MR程序,是可以直接向CDH Hadoop2.5提交作业的,但是由于hadoop2.5中,使用google的guice作为了一个内嵌的MVC轻量级的框架,所以在windows上打包提交时,需要引入额外的guice的几个包,截图如下:

上面几步搞定后,打包整个项目,然后运行成功,过程如下:

Java代码  

  1. 输出路径存在,已删除!
  2. 2015-04-08 19:35:18,001 INFO  [main] client.RMProxy (RMProxy.java:createRMProxy(56)) - Connecting to ResourceManager at /172.26.150.18:8032
  3. 2015-04-08 19:35:18,170 WARN  [main] mapreduce.JobSubmitter (JobSubmitter.java:copyAndConfigureFiles(149)) - Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
  4. 2015-04-08 19:35:21,156 INFO  [main] input.FileInputFormat (FileInputFormat.java:listStatus(287)) - Total input paths to process : 2
  5. 2015-04-08 19:35:21,219 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:submitJobInternal(394)) - number of splits:2
  6. 2015-04-08 19:35:21,228 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - user.name is deprecated. Instead, use mapreduce.job.user.name
  7. 2015-04-08 19:35:21,228 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapred.jar is deprecated. Instead, use mapreduce.job.jar
  8. 2015-04-08 19:35:21,228 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - fs.default.name is deprecated. Instead, use fs.defaultFS
  9. 2015-04-08 19:35:21,229 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces
  10. 2015-04-08 19:35:21,229 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapred.mapoutput.value.class is deprecated. Instead, use mapreduce.map.output.value.class
  11. 2015-04-08 19:35:21,230 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class
  12. 2015-04-08 19:35:21,230 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapred.job.name is deprecated. Instead, use mapreduce.job.name
  13. 2015-04-08 19:35:21,230 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapreduce.inputformat.class is deprecated. Instead, use mapreduce.job.inputformat.class
  14. 2015-04-08 19:35:21,230 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir
  15. 2015-04-08 19:35:21,230 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir
  16. 2015-04-08 19:35:21,230 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapreduce.outputformat.class is deprecated. Instead, use mapreduce.job.outputformat.class
  17. 2015-04-08 19:35:21,231 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
  18. 2015-04-08 19:35:21,233 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapred.mapoutput.key.class is deprecated. Instead, use mapreduce.map.output.key.class
  19. 2015-04-08 19:35:21,233 INFO  [main] Configuration.deprecation (Configuration.java:warnOnceIfDeprecated(840)) - mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir
  20. 2015-04-08 19:35:21,331 INFO  [main] mapreduce.JobSubmitter (JobSubmitter.java:printTokens(477)) - Submitting tokens for job: job_1419419533357_5012
  21. 2015-04-08 19:35:21,481 INFO  [main] impl.YarnClientImpl (YarnClientImpl.java:submitApplication(174)) - Submitted application application_1419419533357_5012 to ResourceManager at /172.21.50.108:8032
  22. 2015-04-08 19:35:21,506 INFO  [main] mapreduce.Job (Job.java:submit(1272)) - The url to track the job: http://http://dnode1:8088/proxy/application_1419419533357_5012/
  23. 2015-04-08 19:35:21,506 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1317)) - Running job: job_1419419533357_5012
  24. 2015-04-08 19:35:33,777 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1338)) - Job job_1419419533357_5012 running in uber mode : false
  25. 2015-04-08 19:35:33,779 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1345)) -  map 0% reduce 0%
  26. 2015-04-08 19:35:43,885 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1345)) -  map 100% reduce 0%
  27. 2015-04-08 19:35:43,902 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1356)) - Job job_1419419533357_5012 completed successfully
  28. 2015-04-08 19:35:44,011 INFO  [main] mapreduce.Job (Job.java:monitorAndPrintJob(1363)) - Counters: 27
  29. File System Counters
  30. FILE: Number of bytes read=0
  31. FILE: Number of bytes written=166572
  32. FILE: Number of read operations=0
  33. FILE: Number of large read operations=0
  34. FILE: Number of write operations=0
  35. HDFS: Number of bytes read=47795
  36. HDFS: Number of bytes written=594
  37. HDFS: Number of read operations=12
  38. HDFS: Number of large read operations=0
  39. HDFS: Number of write operations=4
  40. Job Counters
  41. Launched map tasks=2
  42. Data-local map tasks=2
  43. Total time spent by all maps in occupied slots (ms)=9617
  44. Total time spent by all reduces in occupied slots (ms)=0
  45. Map-Reduce Framework
  46. Map input records=11
  47. Map output records=5
  48. Input split bytes=252
  49. Spilled Records=0
  50. Failed Shuffles=0
  51. Merged Map outputs=0
  52. GC time elapsed (ms)=53
  53. CPU time spent (ms)=2910
  54. Physical memory (bytes) snapshot=327467008
  55. Virtual memory (bytes) snapshot=1905754112
  56. Total committed heap usage (bytes)=402653184
  57. File Input Format Counters
  58. Bytes Read=541
  59. File Output Format Counters
  60. Bytes Written=594
  61. true

最后附上核心代码,以作备忘: 
(1)Map Only作业的代码:

Java代码  

  1. package com.dhgate.search.rate.convert;
  2. import java.io.File;
  3. import java.io.FileInputStream;
  4. import java.io.FileNotFoundException;
  5. import java.io.FilenameFilter;
  6. import java.io.IOException;
  7. import java.util.Map;
  8. import org.apache.hadoop.conf.Configuration;
  9. import org.apache.hadoop.fs.FileSystem;
  10. import org.apache.hadoop.fs.Path;
  11. import org.apache.hadoop.io.LongWritable;
  12. import org.apache.hadoop.io.NullWritable;
  13. import org.apache.hadoop.io.Text;
  14. import org.apache.hadoop.mapreduce.Job;
  15. import org.apache.hadoop.mapreduce.Mapper;
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  18. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  19. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  20. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  21. import org.slf4j.Logger;
  22. import org.slf4j.LoggerFactory;
  23. import com.dhgate.parse.xml.tools.HDFSParseXmlTools;
  24. import com.sun.xml.bind.v2.schemagen.xmlschema.Import;
  25. /**
  26. * 加工处理数据格式
  27. *
  28. * @author qindongliang 2015年04月07日
  29. *
  30. * **/
  31. public class StoreConvert {
  32. //log4j记录
  33. static Logger log=LoggerFactory.getLogger(StoreConvert.class);
  34. /**
  35. * 转换支持的格式
  36. *
  37. * **/
  38. private static class FormatMapper extends Mapper<LongWritable, Text, NullWritable, Text>{
  39. @Override
  40. protected void map(LongWritable key, Text value,Context context)throws IOException, InterruptedException {
  41. String filename = ((FileSplit) context.getInputSplit()).getPath().getName().split("\\.")[0];
  42. //System.out.println("文件名是: "+filename);
  43. //log.info("读取的文件名是: "+filename);
  44. String vs[]=value.toString().split(",");
  45. if(HDFSParseXmlTools.map.get(filename)!=null){
  46. Map<String, String> m=HDFSParseXmlTools.map.get(filename);
  47. StringBuffer sb=new StringBuffer();
  48. for(int i=0;i<vs.length;i++){
  49. //字段\2值
  50. if(i==vs.length-1){
  51. sb.append(m.get(i+"")).append("\2").append(vs[i]);
  52. }else{
  53. sb.append(m.get(i+"")).append("\2").append(vs[i]).append("\1");
  54. }
  55. }
  56. context.write(NullWritable.get(), new Text(filename+" ==  "+sb.toString()));
  57. }
  58. }
  59. }
  60. public static void main(String[] args) throws Exception {
  61. //      System.setProperty("HADOOP_USER_NAME", "root");
  62. Configuration conf=new Configuration();
  63. // getConf(conf);
  64. conf.set("mapreduce.job.jar", "searchrate.jar");
  65. conf.set("fs.defaultFS","hdfs://172.21.50.108:8020");
  66. conf.set("mapreduce.framework.name", "yarn");
  67. conf.set("mapred.remote.os", "Linux");
  68. conf.set("yarn.resourcemanager.scheduler.address", "172.21.50.108:8030");
  69. conf.set("yarn.resourcemanager.address", "172.21.50.108:8032");
  70. // System.exit(0);
  71. Job job=Job.getInstance(conf, "formatdata");
  72. job.setJarByClass(StoreConvert.class);
  73. //          System.out.println("模式:  "+conf.get("mapreduce.jobtracker.address"));;
  74. job.setMapperClass(FormatMapper.class);
  75. job.setMapOutputKeyClass(Text.class);
  76. job.setMapOutputValueClass(Text.class);
  77. job.setInputFormatClass(TextInputFormat.class);
  78. job.setOutputFormatClass(TextOutputFormat.class);
  79. job.setNumReduceTasks(0);//Map Only作业
  80. String path = "/tmp/qin/out";
  81. FileSystem fs = FileSystem.get(conf);
  82. Path p = new Path(path);
  83. if (fs.exists(p)) {
  84. fs.delete(p, true);
  85. System.out.println("输出路径存在,已删除!");
  86. }
  87. FileInputFormat.setInputPaths(job, "/tmp/qin/testfile/");
  88. FileOutputFormat.setOutputPath(job, p);
  89. System.out.println(job.waitForCompletion(true));
  90. }
  91. }

使用解析HDFS上xml文件的代码:

Java代码  

  1. package com.dhgate.parse.xml.tools;
  2. import java.io.BufferedReader;
  3. import java.io.InputStreamReader;
  4. import java.util.HashMap;
  5. import java.util.HashSet;
  6. import java.util.List;
  7. import java.util.Map;
  8. import java.util.Map.Entry;
  9. import java.util.Set;
  10. import java.util.TreeMap;
  11. import org.apache.hadoop.conf.Configuration;
  12. import org.apache.hadoop.fs.FileSystem;
  13. import org.apache.hadoop.fs.Path;
  14. import org.jsoup.Jsoup;
  15. import org.jsoup.nodes.Document;
  16. import org.jsoup.nodes.Element;
  17. import org.slf4j.Logger;
  18. import org.slf4j.LoggerFactory;
  19. /**
  20. * Created by qindongliang on 15-4-6.
  21. * 大数据交流群:415886155
  22. */
  23. public class HDFSParseXmlTools {
  24. private final static Logger log= LoggerFactory.getLogger(HDFSParseXmlTools.class);
  25. //存储元数据信息
  26. public static Map<String, Map<String, String>> map=new HashMap<String, Map<String,String>>();
  27. static Configuration conf=new  Configuration();
  28. static FileSystem fs=null;
  29. static{
  30. log.info("初始化加载mapping.xml开始.......");
  31. try{
  32. conf.set("fs.defaultFS","hdfs://172.21.50.108:8020/");
  33. fs=FileSystem.get(conf);//获取conf对象
  34. Path xml =new Path("/tmp/qin/mapping.xml");//读取HDFS的xml文件
  35. BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(xml)));//获取输入流
  36. StringBuffer sb=new StringBuffer();//声明一个buffer对象用来存储xml文件内容
  37. String line;
  38. line=br.readLine();//读取第一行
  39. sb.append(line);//追加到StringBuffer中
  40. while (line != null){
  41. line=br.readLine();//循环读取
  42. sb.append(line);//循环追加
  43. }
  44. //           System.out.println(sb.toString());
  45. br.close();//释放资源
  46. Document d=Jsoup.parse(sb.toString(),"UTF-8");//解析xml
  47. Set<String> set=new HashSet<String>();//排除,不需要解析的文件
  48. List<Element> excludes=d.select("exclude");
  49. for(Element ee:excludes){
  50. set.add(ee.text().trim());
  51. }
  52. List<Element> tables=d.select("type");
  53. for(Element t:tables){
  54. String num=t.attr("num");
  55. String name=t.attr("name");
  56. String indexname=t.attr("indexname");
  57. if(set.contains(name)){
  58. log.info("跳过的表名:"+name);
  59. continue;
  60. }
  61. //                System.out.println(" 序号: "+num+" 表名:  "+name+"   索引名:  "+indexname);
  62. Map<String, String> data=new TreeMap<String, String>();
  63. for(Element s:t.select("map")){
  64. //                   System.out.println("----------------------"+s.attr("pos")+"  "+s.attr("field")+"  "+s.attr(""));
  65. String pos=s.attr("pos");//位置信息
  66. String field=s.attr("field");//索引字段名
  67. data.put(pos, field);
  68. }
  69. map.put(name, data);//将此表名对应的映射信息存储到map里
  70. }
  71. }catch(Exception e){
  72. //e.printStackTrace();
  73. log.error("加载映射文件异常!",e);
  74. }
  75. }
  76. public static void parseXml()throws Exception{
  77. }
  78. public static void main(String[] args) throws  Exception{
  79. System.out.println();
  80. for(Entry<String, Map<String, String>> m:map.entrySet()){
  81. System.out.println("表名:"+m.getKey());
  82. //          for(Entry<String, String> me:m.getValue().entrySet()){
  83. //              System.out.println(me.getKey()+"         "+me.getValue());
  84. //          }
  85. //          System.out.println("==================================================");
  86. }
  87. }
  88. }

项目结构如下图:

Ant的打包脚本如下:

Java代码  

  1. <project name="${component.name}" basedir="." default="jar">
  2. <property environment="env"/>
  3. <!-- <property name="hadoop.home" value="${env.HADOOP_HOME}"/>  -->
  4. <property name="hadoop.home" value="E:/hadooplib"/>
  5. <!-- 指定jar包的名字 -->
  6. <property name="jar.name" value="searchrate.jar"/>
  7. <path id="project.classpath">
  8. <fileset dir="lib">
  9. <include name="*.jar" />
  10. </fileset>
  11. <fileset dir="${hadoop.home}">
  12. <include name="**/*.jar" />
  13. </fileset>
  14. </path>
  15. <target name="clean" >
  16. <delete dir="bin" failonerror="false" />
  17. <mkdir dir="bin"/>
  18. </target>
  19. <target name="build" depends="clean">
  20. <echo message="${ant.project.name}: ${ant.file}"/>
  21. <javac destdir="bin" encoding="utf-8" debug="true" includeantruntime="false" debuglevel="lines,vars,source">
  22. <src path="src"/>
  23. <exclude name="**/.svn" />
  24. <classpath refid="project.classpath"/>
  25. </javac>
  26. <copy todir="bin">
  27. <fileset dir="src">
  28. <include name="*config*"/>
  29. </fileset>
  30. </copy>
  31. </target>
  32. <target name="jar" depends="build">
  33. <copy todir="bin/lib">
  34. <fileset dir="lib">
  35. <include name="**/*.*"/>
  36. </fileset>
  37. </copy>
  38. <copy todir="bin/lib">
  39. <fileset dir="${hadoop.home}">
  40. <include name="**/*.*"/>
  41. </fileset>
  42. </copy>
  43. <path id="lib-classpath">
  44. <fileset dir="lib" includes="**/*.jar" />
  45. </path>
  46. <pathconvert property="my.classpath" pathsep=" " >
  47. <mapper>
  48. <chainedmapper>
  49. <!-- 移除绝对路径 -->
  50. <flattenmapper />
  51. <!-- 加上lib前缀 -->
  52. <globmapper from="*" to="lib/*" />
  53. </chainedmapper>
  54. </mapper>
  55. <path refid="lib-classpath" />
  56. </pathconvert>
  57. <jar basedir="bin" destfile="${jar.name}" >
  58. <include name="**/*"/>
  59. <!-- define MANIFEST.MF -->
  60. <manifest>
  61. <attribute name="Class-Path" value="${my.classpath}" />
  62. </manifest>
  63. </jar>
  64. </target>
  65. </project>

至此,我们以及完成了,这个小项目的开发,最终回归当生产环境上,我们是需要打成jar包,在linux上定时执行的,直接使用linux环境来开发调试hadoop,遇到的问题会更少,虽然不推荐使用win直接开发hadoop程序,但是了解一些基本的方法和技巧,对我们来说也是一件不错的事情。

时间: 2024-08-29 09:18:19

万法归宗之Hadoop编程无界限的相关文章

Windows下Hadoop编程环境配置指南

刘勇    Email: [email protected] 本博客记录作者在工作与研究中所经历的点滴,一方面给自己的工作与生活留下印记,另一方面若是能对大家有所帮助,则幸甚至哉矣! 简介 鉴于最近在研究Hadoop编程时,为考虑编程的方便,在Windows本地编译源程序,然后直接访问Hadoop集群,这样给广大编程人员提供了极大的便利.在这个过程中积累了一些实际经验,并针对在该过程中(初级阶段)可能会遇到的问题,提供一些解决方案,希望对大家有所帮助. 环境介绍 Hadoop 集群:hadoop

hadoop编程小技巧(9)---二次排序(值排序)

代码测试环境:Hadoop2.4 应用场景:在Reducer端一般是key排序,而没有value排序,如果想对value进行排序,则可以使用此技巧. 应用实例描述: 比如针对下面的数据: a,5 b,7 c,2 c,9 a,3 a,1 b,10 b,3 c,1 如果使用一般的MR的话,其输出可能是这样的: a 1 a 3 a 5 b 3 b 10 b 7 c 1 c 9 c 2 从数据中可以看到其键是排序的,但是其值不是.通过此篇介绍的技巧可以做到下面的输出: a 1 a 3 a 5 b 3 b

hadoop编程小技巧(4)---全局key排序类TotalOrderPartitioner

Hadoop代码测试版本:Hadoop2.4 原理:在进行MR程序之前对输入数据进行随机提取样本,把样本排序,然后在MR的中间过程Partition的时候使用这个样本排序的值进行分组数据,这样就可以达到全局排序的目的了. 难点:如果使用Hadoop提供的方法来实现全局排序,那么要求Mapper的输入.输出的key不变才可以,因为在源码InputSampler中提供的随机抽取的数据是输入数据最原始的key,如下代码(line:225): for (int i = 0; i < splitsToSa

hadoop编程小技巧(2)---计数器Counter

Hadoop代码测试版本:2.4 应用场景:在Hadoop编程的时候,有时我们在进行我们算法逻辑的时候想附带了解下数据的一些特性,比如全部数据的记录数有多少,map的输出有多少等等信息(这些是在算法运行完毕后,直接有的),就可以使用计数器Counter. 如果是针对很特定的数据的一些统计,比如统计以1开头的所有记录数等等信息,这时就需要自定义Counter.自定义Counter有两种方式,第一种,定义枚举类型,类似: public enum MyCounters{ ALL_RECORDS,ONE

hadoop编程技巧(6)---处理大量的小型数据文件CombineFileInputFormat申请书

代码测试环境:Hadoop2.4 应用场景:当需要处理非常多的小数据文件,这种技术的目的,可以被应用到实现高效的数据处理. 原理:申请书CombineFileInputFormat,能够进行切片合并的时候把多个小的数据文件.因为每个切片将有一个Mapper,当一个Mapper处理的数据比較小的时候,其效率较低.而一般使用Hadoop处理数据时.即默认方式,会把一个输入数据文件当做一个分片.这样当输入文件较小时就会出现效率低下的情况. 实例: 參考前篇blog:hadoop编程小技巧(5)---自

hadoop编程小技巧(1)---map端聚合

测试hadoop版本:2.4 Map端聚合的应用场景:当我们只关心所有数据中的部分数据时,并且数据可以放入内存中. 使用的好处:可以大大减小网络数据的传输量,提高效率: 一般编程思路:在Mapper的map函数中读入所有数据,然后添加到一个List(队列)中,然后在cleanup函数中对list进行处理,输出我们关系的少量数据. 实例: 在map函数中使用空格分隔每行数据,然后把每个单词添加到一个堆栈中,在cleanup函数中输出堆栈中单词次数比较多的单词以及次数: package fz.inm

hadoop编程小技巧(3)---自定义分区类Partitioner

Hadoop代码测试环境:Hadoop2.4 原理:在Hadoop的MapReduce过程中,Mapper读取处理完成数据后,会把数据发送到Partitioner,由Partitioner来决定每条记录应该送往哪个reducer节点,默认使用的是HashPartitioner,其核心代码如下: /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K2 key, V2 value, int numRe

hadoop编程小技巧(6)---处理大量小数据文件CombineFileInputFormat应用

代码测试环境:Hadoop2.4 应用场景:当需要处理很多小数据文件的时候,可以应用此技巧来达到高效处理数据的目的. 原理:应用CombineFileInputFormat,可以把多个小数据文件在进行分片的时候合并.由于每个分片会产生一个Mapper,当一个Mapper处理的数据比较小的时候,其效率较低.而一般使用Hadoop处理数据时,即默认方式,会把一个输入数据文件当做一个分片,这样当输入文件较小时就会出现效率低下的情况. 实例: 参考前篇blog:hadoop编程小技巧(5)---自定义输

hadoop编程小技巧(7)---自定义输出文件格式以及输出到不同目录

代码测试环境:Hadoop2.4 应用场景:当需要定制输出数据格式时可以采用此技巧,包括定制输出数据的展现形式,输出路径,输出文件名称等. Hadoop内置的输出文件格式有: 1)FileOutputFormat<K,V>  常用的父类: 2)TextOutputFormat<K,V> 默认输出字符串输出格式: 3)SequenceFileOutputFormat<K,V> 序列化文件输出: 4)MultipleOutputs<K,V> 可以把输出数据输送到