上一篇文章是实现统计hbase单元值出现的个数,并将结果存放到hbase的表中,本文是将结果存放到hdfs上。其中的map实现与前文一直,连接:http://www.cnblogs.com/ljy2013/p/4820056.html,下面主要介绍一下reduce的实现:
(1)reduce的实现
package com.datacenter.HbaseMapReduce.SummaryToFile; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class SummaryToFileReducer extends Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub int i = 0; for (IntWritable val : values) { i += val.get(); } context.write(key, new IntWritable(i)); } }
(2)主类的实现也有些不同
package com.datacenter.HbaseMapReduce.SummaryToFile; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class SummaryToFilemain { static String rootdir = "hdfs://hadoop3:8020/hbase"; static String zkServer = "hadoop3"; static String port = "2181"; private static Configuration conf; private static HConnection hConn = null; public static void HbaseUtil(String rootDir, String zkServer, String port) { conf = HBaseConfiguration.create();// 获取默认配置信息 conf.set("hbase.rootdir", rootDir); conf.set("hbase.zookeeper.quorum", zkServer); conf.set("hbase.zookeeper.property.clientPort", port); try { hConn = HConnectionManager.createConnection(conf); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub HbaseUtil(rootdir, zkServer, port); Job job = new Job(conf,"ExampleSummaryToFile"); job.setJarByClass(SummaryToFilemain.class); // class that contains mapper and reducer Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobs scan.setCacheBlocks(false); // don‘t set to true for MR jobs // set other scan attrs TableMapReduceUtil.initTableMapperJob( "test", // input table scan, // Scan instance to control CF and attribute selection SummaryMapper.class, // mapper class Text.class, // mapper output key IntWritable.class, // mapper output value job); job.setReducerClass(SummaryToFileReducer.class); // reducer class job.setNumReduceTasks(1); // at least one, adjust as required FileOutputFormat.setOutputPath(job, new Path("hdfs://hadoop3:8020/user/liujiyu/score-test")); // adjust directories as required boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } }
时间: 2024-10-15 21:50:50