一:背景
求平均数是MapReduce比较常见的算法,求平均数的算法也比较简单,一种思路是Map端读取数据,Reduce端汇总并且统计记录数,然后作商即可。
二:技术实现
#需求:现有成绩单如下,求出每个同学的平均成绩
[java] view plaincopy
- 小民 语文 80
- 小民 数学 98
- 小民 英语 89
- 小芳 语文 88
- 小芳 数学 99
- 小芳 英语 90
实现代码:
[java] view plaincopy
- public class AverageTest {
- // 定义输入路径
- private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/average_file";
- // 定义输出路径
- private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
- public static void main(String[] args) {
- try {
- // 创建配置信息
- Configuration conf = new Configuration();
- // 创建文件系统
- FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
- // 如果输出目录存在,我们就删除
- if (fileSystem.exists(new Path(OUT_PATH))) {
- fileSystem.delete(new Path(OUT_PATH), true);
- }
- // 创建任务
- Job job = new Job(conf, AverageTest.class.getName());
- //1.1 设置输入目录和设置输入数据格式化的类
- FileInputFormat.setInputPaths(job, INPUT_PATH);
- job.setInputFormatClass(TextInputFormat.class);
- //1.2 设置自定义Mapper类和设置map函数输出数据的key和value的类型
- job.setMapperClass(AverageMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(Text.class);
- //1.3 设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
- job.setPartitionerClass(HashPartitioner.class);
- job.setNumReduceTasks(1);
- //1.4 排序
- //1.5 归约
- //2.1 Shuffle把数据从Map端拷贝到Reduce端。
- //2.2 指定Reducer类和输出key和value的类型
- job.setReducerClass(AverageReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(FloatWritable.class);
- //2.3 指定输出的路径和设置输出的格式化类
- FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
- job.setOutputFormatClass(TextOutputFormat.class);
- // 提交作业 退出
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- public static class AverageMapper extends Mapper<LongWritable, Text, Text, Text>{
- //设置输出的key和value
- private Text outKey = new Text();
- private Text outValue = new Text();
- @Override
- protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context) throws IOException, InterruptedException {
- //获取输入的行
- String line = value.toString();
- //取出无效记录
- if (line == null || line.equals("")){
- return ;
- }
- //对数据进行切分
- String[] splits = line.split("\t");
- //截取姓名和成绩
- String name = splits[0];
- String score = splits[2];
- //设置输出的Key和value
- outKey.set(name);
- outValue.set(score);
- //将结果写出去
- context.write(outKey, outValue);
- }
- }
- public static class AverageReducer extends Reducer<Text, Text, Text, FloatWritable>{
- //定义写出去的Key和value
- private Text name = new Text();
- private FloatWritable avg = new FloatWritable();
- @Override
- protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, FloatWritable>.Context context) throws IOException, InterruptedException {
- //定义科目数量
- int courseCount = 0;
- //定义中成绩
- int sum = 0;
- //定义平均分
- float average = 0;
- //遍历集合求总成绩
- for (Text val : value){
- sum += Integer.parseInt(val.toString());
- courseCount ++;
- }
- //求平均成绩
- average = sum / courseCount;
- //设置写出去的名字和成绩
- name.set(key);
- avg.set(average);
- //把结果写出去
- context.write(name, avg);
- }
- }
- }
程序运行的结果:
时间: 2024-10-12 02:10:31