第一步:数据集导入HDFS
使用命令行访问刚刚上传至HDFS的数据集
第一步:在 Map 阶段,提取气象站和气温数据。
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); //读取每行数据 int temperature = Integer.parseInt(line.substring(14, 19).trim());//气温 if (temperature != -9999) { //过滤无效数据 String weatherStationId = "weatherStationId";//真实的气象站id是从文件名字中提取,为了便于单元测试,这里key设置为常量weatherStationId context.write(new Text(weatherStationId), new IntWritable(temperature)); } }
第二步:在 Reduce 阶段,统计每个气象站的平均气温。
public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; int count = 0; for (IntWritable val : values) { sum += val.get(); count++; } result.set(sum / count); context.write(key, result); } }
第三步:对代码进行单元测试及debug调试
/** * Mapper 端的单元测试 */ @SuppressWarnings("all") public class TemperatureMapperTest { private Mapper mapper;//定义一个Mapper对象 private MapDriver driver;//定义一个MapDriver 对象 @Before public void init() { mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象 driver = new MapDriver(mapper);//实例化MapDriver对象 } @Test public void test() throws IOException { //输入一行测试数据 String line = "1985 07 31 02 200 94 10137 220 26 1 0 -9999"; driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致 .withOutput(new Text("weatherStationId"), new IntWritable(200))//跟TemperatureMapper输出类型一致 .runTest(); } }
/** * Reducer 单元测试 */ @SuppressWarnings("all") public class TemperatureReduceTest { private Reducer reducer;//定义一个Reducer对象 private ReduceDriver driver;//定义一个ReduceDriver对象 @Before public void init() { reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象 driver = new ReduceDriver(reducer);//实例化ReduceDriver对象 } @Test public void test() throws IOException { String key = "weatherStationId";//声明一个key值 List values = new ArrayList(); values.add(new IntWritable(200));//添加第一个value值 values.add(new IntWritable(100));//添加第二个value值 driver.withInput(new Text("weatherStationId"), values)//跟TemperatureReducer输入类型一致 .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致 .runTest(); } }
/** * Mapper 和 Reducer 集成起来测试 */ @SuppressWarnings("all") public class TemperatureTest { private Mapper mapper;//定义一个Mapper对象 private Reducer reducer;//定义一个Reducer对象 private MapReduceDriver driver;//定义一个MapReduceDriver 对象 @Before public void init() { mapper = new Temperature.TemperatureMapper();//实例化一个Temperature中的TemperatureMapper对象 reducer = new Temperature.TemperatureReducer();//实例化一个Temperature中的TemperatureReducer对象 driver = new MapReduceDriver(mapper, reducer);//实例化MapReduceDriver对象 } @Test public void test() throws RuntimeException, IOException { //输入两行行测试数据 String line = "1985 07 31 02 200 94 10137 220 26 1 0 -9999"; String line2 = "1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999"; driver.withInput(new LongWritable(), new Text(line))//跟TemperatureMapper输入类型一致 .withInput(new LongWritable(), new Text(line2)) .withOutput(new Text("weatherStationId"), new IntWritable(150))//跟TemperatureReducer输出类型一致 .runTest(); } }
第四步:将项目编译和打包为Temperature.jar,使用客户端将 Temperature.jar上传至hadoop的/home/hadoop/data目录下。
第五步:使用cd /home/hadoop/data 切换到当前目录,通过
export HADOOP_CLASSPATH=/home/hadoop/data/Temperature.jar
hadoop com.hadoop.base.Temperature
命令行执行任务。
第六步:任务的最终结果输出到 HDFS ,使用hadoop fs -cat /weather/output/part-r-00000命令查看结果。
原文地址:https://www.cnblogs.com/zhoupp/p/10914302.html
时间: 2024-11-07 19:13:02