博文作者:那伊抹微笑
csdn 博客地址:http://blog.csdn.net/u012185296
itdog8 地址链接 : http://www.itdog8.com/thread-203-1-1.html
博文标题:HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑
个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在
技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+Mahout+Spark ... 云计算技术
转载声明:可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明,谢谢合作!
qq交流群:214293307 (期待与你一起学习,共同进步)
参考 : http://abloz.com/hbase/book.html#mapreduce.example
1 官网代码
下面是使用HBase 作为源的MapReduce读取示例。特别是仅有Mapper实例,没有Reducer。Mapper什么也不产生。
如下所示...
Configuration config = HBaseConfiguration.create();Job job = new Job(config, "ExampleRead");job.setJarByClass(MyReadJob.class); // class that contains mapper Scan scan = new Scan();scan.setCaching(500); // 1 is the default in Scan, which will be bad for MapReduce jobsscan.setCacheBlocks(false); // don‘t set to true for MR jobs// set other scan attrs... TableMapReduceUtil.initTableMapperJob( tableName, // input HBase table name scan, // Scan instance to control CF and attribute selection MyMapper.class, // mapper null, // mapper output key null, // mapper output value job);job.setOutputFormatClass(NullOutputFormat.class); // because we aren‘t emitting anything from mapper boolean b = job.waitForCompletion(true);if (!b) { throw new IOException("error with job!");}
...mapper需要继承于TableMapper...
public class MyMapper extends TableMapper<Text, LongWritable> { public void map(ImmutableBytesWritable row, Result value, Context context) throws InterruptedException, IOException { // process data for the row from the Result instance.
2 我的参考代码
package com.itdog8.cloud.hbase.mr.test; import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; /** * TestHBaseAsSourceMapReduceMainClass * * @author 那伊抹微笑 * @date 2015-07-30 18:00:21 * */ public class TestHBaseAsSourceMapReduceMainClass { private static final Log _log = LogFactory.getLog(TestHBaseAsSourceMapReduceMainClass.class); private static final String JOB_NAME = "TestHBaseAsSourceMapReduce"; private static String tmpPath = "/tmp/com/itdog8/yting/TestHBaseAsSourceMapReduce"; private static String hbaseInputTble = "itdog8:test_1"; public static class ExampleSourceMapper extends TableMapper<Text, Text> { private Text k = new Text(); private Text v = new Text(); @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } @Override protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { String rowkey = Bytes.toString(key.get()); // 这里的操作需要熟悉下 Result 的操作就行了,接下来就是业务逻辑了 try { // set value k.set("望咩望"); v.set("食屎啦你"); // context write to reducer context.write(k, v); } catch (Exception e) { e.printStackTrace(); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { super.cleanup(context); } } public static void main(String[] args) throws Exception { // hbase configuration Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "a234-198.hadoop.com,a234-197.hadoop.com,a234-196.hadoop.com"); conf.set("hbase.zookeeper.property.clientPort", "2181"); // batch and caching Scan scan = new Scan(); scan.setCaching(10000); scan.setCacheBlocks(false); scan.setMaxVersions(1); // set hadoop speculative execution to false conf.setBoolean("mapred.map.tasks.speculative.execution", false); conf.setBoolean("mapred.reduce.tasks.speculative.execution", false); // tmp index path tmpPath = args[0]; Path tmpIndexPath = new Path(tmpPath); FileSystem fs = FileSystem.get(conf); if(fs.exists(tmpIndexPath)) { // fs.delete(tmpIndexPath, true); // dangerous // _log.info("delete tmp index path : " + tmpIndexPath.getName()); _log.warn("The hdfs path ["+tmpPath+"] existed, please change a path."); return ; } // Job && conf Job job = new Job(conf, JOB_NAME); job.setJarByClass(TestHBaseAsSourceMapReduceMainClass.class); TableMapReduceUtil.initTableMapperJob(hbaseInputTble, scan, ExampleSourceMapper.class, Text.class, Text.class, job); // job.setReducerClass(MyReducer.class); // 自己的处理逻辑 job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setOutputFormatClass(TextOutputFormat.class); FileOutputFormat.setOutputPath(job, tmpIndexPath); int success = job.waitForCompletion(true) ? 0 : 1; System.exit(success); } }
版权声明:本文为博主原创文章,未经博主允许不得转载。
时间: 2024-08-05 11:14:00