简单4个分区。
package com.rocky.mr.partition;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rocky.util.TimeUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
/**
* Created by Administrator on 2016/4/11.
*/
public class MyPartition {
public static final String clazz = "com.spring.aop.StorageManagerStatAspect";
public static final String m_download = "com.systoon.scloud.master.controller.ImageController.download";
public static final String m_upload = "com.systoon.scloud.master.controller.DirectUploadFile.directUploadFile";
/** patrition param */
public static Text word = new Text();
public static Text partitionDownload = new Text("download"); // download 0
public static Text partitionUpload = new Text("upload"); // upload 1
public static Text partitionOther = new Text("others"); // others 2
public static Text partitionCount = new Text("count"); // count 3
public static class PMapper extends Mapper<LongWritable, Text, Text, Text>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
word.set("1");
context.write(partitionCount, new Text("1"));
if(line.contains(clazz)){
if(line.contains(m_download)){
String tempObject = line.split(clazz)[1];
String tmp = tempObject.substring(1,tempObject.length());
JSONObject jsonObject = JSON.parseObject(tmp);
String method = jsonObject.get("method").toString();
if( method.equals(m_download) ){
context.write(partitionDownload, word);
}
} else if(line.contains(m_upload)) {
String tempObject = line.split(clazz)[1];
String tmp = tempObject.substring(1,tempObject.length());
JSONObject jsonObject = JSON.parseObject(tmp);
String method = jsonObject.get("method").toString();
if( method.equals(m_upload) ){
context.write(partitionUpload, word);
}
} else {
context.write(partitionOther, word);
}
} else {
context.write(partitionOther , word);
}
}
}
public static class PReduce extends Reducer<Text,Text,Text,Text>{
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// long count = 0l;
// if(key.toString().equals(partitionCount.toString())){
// for (Text test:values){
// count ++;
// }
// word.set(count+"");
// context.write(key,word);
// } else {
// for (Text test:values){
// context.write(key,test);
// }
// word.set(count+"");
// context.write(key,word);
// }
long count = 0l;
for (Text text:values){
count ++;
}
word.set(count + "");
context.write(key,word);
}
}
public static class CustomizationPartition extends HashPartitioner<Text,Text> implements Configurable {
private Configuration conf = null;
public CustomizationPartition(){
}
@Override
public Configuration getConf() {
return conf;
}
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
public int getPartition(Text key, Text value, int numReduceTasks){
if(key.toString().equals("download")){
return 0;
} else if(key.toString().equals("upload")){
return 1;
} else if(key.toString().equals("count")){
return 2;
} else {
// key.toString().equals("others")
return 3;
}
}
}
public static void main(String[] args) throws URISyntaxException, IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
String outPath = "/test/mapReduce/partition"+ TimeUtils.getStringDate();
// check
final FileSystem filesystem = FileSystem.get(new URI(outPath), conf);
if(filesystem.exists(new Path(outPath))){
filesystem.delete(new Path(outPath), true);
}
Job job = new Job( conf,"rocky_partition");
job.setJarByClass(MyPartition.class);
job.setPartitionerClass(CustomizationPartition.class);
job.setNumReduceTasks(4);
job.setMapperClass(PMapper.class);
job.setReducerClass(PReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("/test/mapReduce/source/statistics.log.2016-03-01"));
// FileInputFormat.addInputPath(job, new Path("/test/mapReduce/source/statistics.log.2016-03-02"));
FileOutputFormat.setOutputPath(job, new Path(outPath));
System.exit(job.waitForCompletion(true)?0:1);// 是否正常退出
}
}
时间: 2024-10-03 15:53:34