package cn.itcast.bigdata.shsq;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/**
* MapReduce实现双色球近10年每个号码中奖次数统计
*
* 输入数据格式:
* -------红球部分---- 篮球本分
* 13 16 05 06 19 02 03
04 19 16 27 09 15 10
23 06 31 19 13 17 12
08 10 17 30 20 27 01
12 09 01 18 15 04 05
22 16 17 19 09 26 10
处理后输出
第一列 第二列 第三列 第四列 第五列 第六列 第七列
号码/次数 号码/次数 号码/次数 号码/次数 号码/次数 号码/次数 号码/次数
01/50 01/58 01/42 01/54 01/53 01/58 01/100
02/48 02/42 02/42 02/58 02/40 02/66 02/94
03/49 03/40 03/45 03/46 03/50 03/45 03/89
04/44 04/46 04/58 04/47 04/47 04/35 04/100
05/43 05/50 05/51 05/41 05/49 05/53 05/95
06/59 06/52 06/47 06/50 06/48 06/53 06/106
07/59 07/45 07/53 07/47 07/46 07/47 07/105
08/56 08/53 08/49 08/35 08/53 08/56 08/87
09/47 09/43 09/49 09/49 09/51 09/46 09/105
10/42 10/62 10/36 10/55 10/50 10/45 10/101
11/45 11/48 11/50 11/40 11/53 11/37 11/96
12/42 12/58 12/41 12/61 12/46 12/47 12/113
13/49 13/55 13/49 13/42 13/53 13/50 13/97
14/56 14/52 14/42 14/59 14/48 14/56 14/101
15/46 15/56 15/42 15/38 15/47 15/55 15/99
16/38 16/55 16/47 16/45 16/50 16/46 16/108
17/43 17/37 17/55 17/64 17/60 17/47
18/49 18/51 18/50 18/46 18/57 18/43
19/44 19/52 19/49 19/51 19/47 19/53
20/49 20/47 20/42 20/51 20/55 20/54
21/48 21/46 21/49 21/47 21/35 21/52
22/60 22/52 22/64 22/55 22/49 22/39
23/47 23/42 23/53 23/56 23/40 23/52
24/36 24/50 24/56 24/38 24/49 24/36
25/49 25/56 25/48 25/48 25/42 25/43
26/60 26/43 26/62 26/50 26/42 26/61
27/58 27/38 27/48 27/47 27/44 27/53
28/56 28/53 28/44 28/39 28/46 28/40
29/31 29/44 29/52 29/51 29/61 29/48
30/50 30/44 30/61 30/51 30/39 30/38
31/53 31/45 31/41 31/53 31/36 31/44
32/47 32/42 32/48 32/50 32/55 32/53
33/43 33/39 33/31 33/32 33/55 33/45
*
* Mapper端
* @author songjq
*
*/
public class ShauangsqMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, " ");
for (int i = 0; i < fields.length; i++) {
context.write(new Text("Col" + (i + 1) + "_" + fields[i]), new LongWritable(1));
}
}
}
package cn.itcast.bigdata.shsq;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class ShauangsqReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values,
Context ctx) throws IOException, InterruptedException {
int counter = 0;
for(LongWritable value:values) {
counter += 1;
}
ctx.write(key, new LongWritable(counter));
}
}
package cn.itcast.bigdata.shsq;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class ShauangsqJobSubmiter {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(ShauangsqJobSubmiter.class);
job.setMapperClass(ShauangsqMapper.class);
job.setReducerClass(ShauangsqReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
原文地址:http://blog.51cto.com/2951890/2157909