前面2篇文章知道了HDFS的存储原理,知道了上传和下载文件的过程,同样也知晓了MR任务的执行过程,以及部分代码也已经看到,那么下一步就是程序员最关注的关于MR的业务代码(这里不说太简单的):
一、关于MapTask的排序
mapTask正常情况,按照key的hashcode进行从小到大的排序操作,形成map输出,交给reduce,(据某篇博文说,hashcode排序使用的是快排,这个无从考证),这里说明一下如何使用POJO类作为key,使其进行排序。
1)POJO类实现WritableComparable<T>接口;
2)重写compareTo(T t)方法,在此方法中返回为int,使用当前对象的排序对象,减去传入对象的排序字段,便是倒序排序(按照想要的方式)。示例为按照电影热度值倒序排序。
@Override public int compareTo(Movie o) { return this.hot-o.hot; }
二、关于地址复用
1)注意,在reduce中,关于reduce方法中的values的迭代器,一旦遍历过后,迭代器中值将不再存在;
2)这里是因为reduceTask在反射调用reduce方法时,为节省内存空间,使用了地址复用技术;
3)所以如果想让对象保存下来,那么必须将对象完全克隆,这里建议,在使用POJO时候,最好实现clone方法,以便方便保存迭代器中的对象;示例代码是重写clone方法,以方便克隆;
public Item clone(){ Item o=null; try { o = (Item) super.clone(); } catch (CloneNotSupportedException e) { // TODO Auto-generated catch block e.printStackTrace(); } return o; }
三、关于多文件join
多文件join操作,map任务也是支持的,只需将多文件放在指定的hdfs输入目录即可,那么在map方法中,只需要将关联字段提为key,其他封装入对象,然后reduce中你想要的结果就是join后的,至于保留哪些字段,去掉冗余字段,那就全凭自己操作了。
四、关于combiner
根据前文,combine操作,即使将每一个maptask的输出结果,进行合并排序操作,如果程序员使用MR的人,指定了conbine操作的存在,那么maptask会根据spill内存缓冲溢出文件的数量进行判断是否确实需要combine操作,因为combine操作也会浪费资源,默认值中,假如spill文件的数量小于3,那么便不会进行combine操作,否则先进行combine操作,combine操作只针对于每一个maptask小任务,然后根据shuffle的原理,这些combine后的输出文件会被reduce的复制进行拿走。combine的启用方式:
job.setCombinerClass(InventReducer.class);
此段代码运行在MRDriver中,指定一个combiner的reduce类,和reduce的思路以及代码方式都一样:
public class InventReducer extends Reducer<Text, Text, Text, Text>{ @Override protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { int count=0; for(Text t:values){ count++; } String[] sss=key.toString().split(" "); context.write(new Text(sss[0]), new Text(sss[1]+" "+count)); } }
五、关于分区
指定分区类,分区操作是在maptask中进行,当maptask输出文件结束后,maptask会根据spill文件进行排序和分区操作。
Driver中指定分区类:
job.setPartitionerClass(AuthPartitioner.class);
分区类代码:
分区类中,key为maptask的输出key,int numPartitions为分区编号,有几个分区编号,将会分几个区。
public class AuthPartitioner extends Partitioner<IntWritable, IntWritable>{ @Override public int getPartition(IntWritable key, IntWritable value, int numPartitions) { String num=String.valueOf(key.get()); if(num.matches("[0-9][0-9]")||num.matches("[0-9]")){ return 0; } if(num.matches("[0-9][0-9][0-9]")){ return 1; }else{ return 2; } } }
分区的目的在于指定的多个reduceTask可以分别处理自己分区的数据,以便让数据均匀地落盘。
六、关于Job链
这就是在Java代码中让多个Job串起来,实现很简单:代码写在Driver中
//判断上一个Job的完成情况 if (job.waitForCompletion(true)){ //执行第二个Job代码 }
七、自定义格式输入(Map的输入)
默认map方法的输入为:间隔字符数量long型、本行数据text类型;
假如想改变这些输入,假如将第一个key输入变为intWritable
需要一个Format类和Reader类:
public class AuthFormat extends FileInputFormat<IntWritable, Text>{ @Override public RecordReader<IntWritable, Text> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new AuthReader(); } }
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader; public class AuthReader extends RecordReader<IntWritable, Text>{ private FileSplit fs ; private LineReader lineReader ; private IntWritable key ; private Text value ; //--定义一个计数器,记录本次读取到了多少行 int count = 0; @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { this.fs = (FileSplit) split; //--获取文件路径 Path path = fs.getPath(); //--获取文件系统 Configuration conf = context.getConfiguration(); FileSystem fileSystem = path.getFileSystem(conf); //--通过文件系统读取文件得到流 FSDataInputStream in = fileSystem.open(path); //--将流包装为LineReader方便按行读取 lineReader = new LineReader(in); } @Override public boolean nextKeyValue() throws IOException, InterruptedException { //--要返回的键,本次读取的行数 key = new IntWritable(); //--要返回的值,本次读取到的内容 value = new Text(); //--定义一个temp临时记录内容 Text temp = new Text(); int len = lineReader.readLine(temp); //--判断是否读取到了数据 if(len == 0){ //--表示没有行数据可读,则不再执行 nextKeyValue()方法 return false; }else{ //--读到了数据,将数据追加到value中 //可以这样写:value=tmp; //也可以像下面这样写 value.append(temp.getBytes(), 0, temp.getLength()); //--计数器加1,表明读取到了一行内容 count++; key.set(count); return true; } } @Override public IntWritable getCurrentKey() throws IOException, InterruptedException { return key; } @Override public Text getCurrentValue() throws IOException, InterruptedException { return value; } @Override public float getProgress() throws IOException, InterruptedException { return 0; } @Override public void close() throws IOException { if(lineReader != null)lineReader.close(); } }
在上文的源码解析中,知道了Reader的作用在于一行一行地读取源文件给maptask任务。
这里相当于子类重写了父类的方法,在调用时,会直接调用子类的方法。
而在Driver中需要增加:
job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(AuthFormat.class);
八、自定义格式输出
自定义格式输出的目的在于规范输出格式重量,简单来说就是可以输出想要的任意输出格式:
public class AuthOutputFormat<K,V> extends FileOutputFormat<K,V>{ @Override public RecordWriter<K,V> gtetRecordWrier(TaskAttemptContext job) throws IOException, InterruptedException { //Get the default path and filename for the output format. //第二个参数:extension an extension to add to the filename Path path=super.getDefaultWorkFile(job, ""); Configuration conf=job.getConfiguration(); FileSystem fs=path.getFileSystem(conf); FSDataOutputStream out=fs.create(path); return new AuthWriter<K,V>(out,"|","\r\n"); } } public class AuthWriter<K,V> extends RecordWriter<K,V>{ private FSDataOutputStream out; private String keyValueSeparator; private String lineSeparator; public AuthWriter(FSDataOutputStream out, String keyValueSeparator, String lineSeparator) { this.out=out; this.keyValueSeparator=keyValueSeparator; this.lineSeparator=lineSeparator; } @Override public void write(K key, V value) throws IOException, InterruptedException { out.write(key.toString().getBytes()); out.write(keyValueSeparator.getBytes()); out.write(value.toString().getBytes()); out.write(lineSeparator.getBytes()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { if(out!=null)out.close(); } }
同样一个format和一个writer,然后在Driver中指定即可
//有了这句话,不用再写原来的输出语句 job.setOutputFormatClass(AuthOutputFormat.class);
九、关于多输入源
在Driver中指定:
public class ScoreDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "JobName"); job.setJarByClass(cn.gjm.hadoop.ScoreDriver.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setInputFormatClass(AuthInputFormat.class); //需要注意,如果一个Mapper代码不能通用的解决,则需要分别指定。此时,就不能去设置 //setMapperClass()了 MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score.txt"),AuthInputFormat.class,ScoreMapper.class); MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score-1.txt"),TextInputFormat.class,ScoreMapper2.class); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/result")); if (!job.waitForCompletion(true)) return; }
十、多输出源
public class ScoreReducer extends Reducer<Text, Text, Text, Text>{ private MultipleOutputs<Text, Text> mos; @Override protected void reduce(Text name, Iterable<Text> scores, Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { for(Text score:scores){ if(name.toString().equals("jary")){ mos.write("jary",name,score); } if(name.toString().equals("rose")){ mos.write("rose",name,score); } if(name.toString().equals("tom")){ mos.write("tom", name,score); } } } @Override protected void setup(Reducer<Text, Text, Text, Text>.Context context) throws IOException, InterruptedException { mos=new MultipleOutputs<>(context); } } ScoreDriver代码: public class ScoreDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "JobName"); job.setJarByClass(cn.tarena.hadoop.ScoreDriver.class); // TODO: specify a reducer job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setReducerClass(ScoreReducer.class); job.setInputFormatClass(AuthInputFormat.class); MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score.txt"),AuthInputFormat.class,ScoreMapper.class); MultipleInputs.addInputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/format-score-1.txt"),TextInputFormat.class,ScoreMapper2.class); MultipleOutputs.addNamedOutput(job, "jary", AuthOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "tom", AuthOutputFormat.class, Text.class, Text.class); MultipleOutputs.addNamedOutput(job, "rose", AuthOutputFormat.class, Text.class, Text.class); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.234.21:9000/formatscore/result")); if (!job.waitForCompletion(true)) return; } }
十一,多次排序
//只需在compare方法中指定多字段排序即可 @Override public int compareTo(Profit o) { int result=this.month-o.month; if(result!=0){ return result; }else{ return o.profit-this.profit; } }
原文地址:https://www.cnblogs.com/qfxydtk/p/11173625.html