为什么需要MapReduce on HBase?
hbase本身并没有提供很好地二级索引方式。如果直接使用hbase提供的scan直接扫描方式,在数据量很大的情况下就会非常慢。
可以使用Mapreduce的方法操作hbase数据库。Hadoop MapReduce提供相关API,可以与hbase数据库无缝连接。
API链接: http://hbase.apache.org/devapidocs/index.html
HBase与Hadoop的API对比
相关类
TableMapper
package org.apache.hadoop.hbase.mapreduce;
/**
* Extends the base <code>Mapper</code> class to add the required input key
* and value classes.
*
* @param <KEYOUT> The type of the key.
* @param <VALUEOUT> The type of the value.
* @see org.apache.hadoop.mapreduce.Mapper
*/
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}
TableReducer
package org.apache.hadoop.hbase.mapreduce;
/**
* Extends the basic <code>Reducer</code> class to add the required key and
* value input/output classes. While the input key and value as well as the
* output key can be anything handed in from the previous map phase the output
* value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put}
* or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when
* using the {@link TableOutputFormat} class.
* <p>
* This class is extended by {@link IdentityTableReducer} but can also be
* subclassed to implement similar features or any custom code needed. It has
* the advantage to enforce the output value to a specific basic type.
* @param <KEYIN> The type of the input key.
* @param <VALUEIN> The type of the input value.
* @param <KEYOUT> The type of the output key.
* @see org.apache.hadoop.mapreduce.Reducer
*/
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
}
TableMapReduceUtil
public class TableMapReduceUtil {
/**
* Use this before submitting a TableMap job. It will appropriately set up
* the job.
*
* @param table The table name to read from.
* @param scan The scan instance with the columns, time range etc.
* @param mapper The mapper class to use.
* @param outputKeyClass The class of the output key.
* @param outputValueClass The class of the output value.
* @param job The current job to adjust. Make sure the passed job is
* carrying all necessary HBase configuration.
* @throws IOException When setting up the details fails.
*/
public static void initTableMapperJob(String table, Scan scan,
Class<? extends TableMapper> mapper,
Class<?> outputKeyClass,
Class<?> outputValueClass, Job job)
throws IOException {
initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
job, true);
}
/**
* Use this before submitting a TableReduce job. It will
* appropriately set up the JobConf.
*
* @param table The output table.
* @param reducer The reducer class to use.
* @param job The current job to adjust.
* @throws IOException When determining the region count fails.
*/
public static void initTableReducerJob(String table,
Class<? extends TableReducer> reducer, Job job)
throws IOException {
initTableReducerJob(table, reducer, job, null);
}
}
Demo
package MapReduceHbase;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
public class HbaseMR {
private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;
private HbaseMR(String rootDir,String zkServer,String port) throws IOException{
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;
conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", rootDir);
conf.set("hbase.zookeeper.quorum", zkServer);
conf.set("hbase.zookeeper.property.clientPort", port);
hConn = HConnectionManager.createConnection(conf);
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String rootDir = "hdfs://hadoop1:8020/hbase";
String zkServer = "hadoop1";
String port = "2181";
HbaseMR conn = new HbaseMR(rootDir,zkServer,port);
//Configuration conf = HBaseConfiguration.create();
//Configuration conf = HBaseConfiguration.create();
Job job = new Job(conn.conf,"MapReduce on HBase");
job.setJarByClass(HbaseMR.class);
Scan scan = new Scan();
scan.setCaching(1000);//事先读取多少条记录
TableMapReduceUtil.initTableMapperJob("students",scan,
MyMapper.class,Text.class,Text.class,job);
TableMapReduceUtil.initTableReducerJob("students_age", MyReducer.class, job);
job.waitForCompletion(true);
}
}
class MyMapper extends TableMapper<Text, Text>{
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
Text k = new Text(Bytes.toString(key.get()));
Text v = new Text(value.getValue(Bytes.toBytes("basicInfo"),
Bytes.toBytes("age")));
//年龄 人名
context.write(v, k);
}
}
class MyReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{
@Override
protected void reduce(Text k2, Iterable<Text> v2s,
Context context)
throws IOException, InterruptedException {
Put put = new Put(Bytes.toBytes(k2.toString()));
for (Text v2 : v2s) {//遍历获得所有的人名
//列族 列 值
put.add(Bytes.toBytes("f1"), Bytes.toBytes(v2.toString()),
Bytes.toBytes(v2.toString()));
}
context.write(null, put);
}
}
运行之前先创建一个students_age表,列族为f1。
运行结果:
版权声明:本文为博主原创文章,未经博主允许不得转载。
时间: 2024-12-16 04:46:35