上面一篇文章仅仅是介绍如何通过mapReduce来对HBase进行读的过程,下面将要介绍的是利用mapreduce进行读写的过程,前面我们已经知道map实际上是读过程,reduce是写的过程,然而map也可以实现写入的过程,因此可以通过map实现读写的过程。具体实现如下所示:
(1)map的实现
package com.datacenter.HbaseMapReduce.ReadWrite; import java.io.IOException; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableMapper; public class ReadWriteHbaseMap extends TableMapper<ImmutableBytesWritable, Put> { @Override protected void map(ImmutableBytesWritable row, Result value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub context.write(row, resultToPut(row, value)); } private static Put resultToPut(ImmutableBytesWritable key, Result result) throws IOException { Put put = new Put(key.get()); for (KeyValue kv : result.raw()) { put.add(kv); } return put; } }
(2)主类的main的实现
package com.datacenter.HbaseMapReduce.ReadWrite; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.mapreduce.Job; public class ReadWriteHbase { static String rootdir = "hdfs://hadoop3:8020/hbase"; static String zkServer = "hadoop3"; static String port = "2181"; private static Configuration conf; private static HConnection hConn = null; public static void HbaseUtil(String rootDir, String zkServer, String port) { conf = HBaseConfiguration.create();// 获取默认配置信息 conf.set("hbase.rootdir", rootDir); conf.set("hbase.zookeeper.quorum", zkServer); conf.set("hbase.zookeeper.property.clientPort", port); try { hConn = HConnectionManager.createConnection(conf); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } public static void main(String[] args) throws Exception { // TODO Auto-generated method stub HbaseUtil(rootdir, zkServer, port); // Configuration config = HBaseConfiguration.create(); Job job = new Job(conf, "ExampleReadWrite"); job.setJarByClass(ReadWriteHbase.class); // class that contains mapper Scan scan = new Scan(); scan.setCaching(500); // 1 is the default in Scan, which will be bad for // MapReduce jobs scan.setCacheBlocks(false); // don‘t set to true for MR jobs // set other scan attrs TableMapReduceUtil.initTableMapperJob("score", // input table scan, // Scan instance to control CF and attribute selection ReadWriteHbaseMap.class, // mapper class null, // mapper output key null, // mapper output value job); TableMapReduceUtil.initTableReducerJob("liujiyu", // output table null, // reducer class job); job.setNumReduceTasks(0); boolean b = job.waitForCompletion(true); if (!b) { throw new IOException("error with job!"); } } }
注意:上面虽然利用TableMapReduceUtil来初始化输出的表,但是我们的reduce个数是0,job.setNumReduceTasks(0)。
时间: 2024-10-15 19:25:50