测试数据:
datas
1001 lilei 17 13800001111 1002 lily 16 13800001112 1003 lucy 16 13800001113 1004 meimei 16 13800001114
数据批量导入使用mr,先生成HFile文件然后在用completebulkload工具导入。
1、需要先在hbase 创建表名:
hbase> create ‘student‘, {NAME => ‘info‘}
maven pom.xml配置文件如下:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<!-- hbase --> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.0</version> </dependency>
编写MapReduce代码如下:
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /** * @author 作者 E-mail: * @version 创建时间:2016年3月2日 下午4:15:57 * 类说明 */ public class CreateHfileByMapReduce { public static class MyBulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{ @Override protected void setup( Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context ) throws IOException, InterruptedException { super.setup( context ); } @Override protected void map( LongWritable key, Text value, Context context ) throws IOException, InterruptedException { String[] split = value.toString().split("\t"); // 根据实际情况修改 if (split.length == 4){ byte[] rowkey = split[0].getBytes(); ImmutableBytesWritable imrowkey = new ImmutableBytesWritable( rowkey ); context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1]))); context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(split[2]))); context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(split[3]))); } } } @SuppressWarnings( "deprecation" ) public static void main( String[] args ) { if (args.length != 4){ System.err.println("Usage: CreateHfileByMapReduce <table_name><data_input_path><hfile_output_path> "); System.exit(2); } String tableName = args[0]; String inputPath = args[1]; String outputPath = args[2]; /* String tableName = "student"; String inputPath = "hdfs://node2:9000/datas"; String outputPath = "hdfs://node2:9000/user/output";*/ HTable hTable = null; Configuration conf = HBaseConfiguration.create(); try { hTable = new HTable(conf, tableName); Job job = Job.getInstance( conf, "CreateHfileByMapReduce"); job.setJarByClass( CreateHfileByMapReduce.class ); job.setMapperClass(MyBulkMapper.class); job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class); // HFileOutputFormat.configureIncrementalLoad(job, hTable); FileInputFormat.addInputPath( job, new Path(inputPath) ); FileOutputFormat.setOutputPath( job, new Path(outputPath) ); System.exit( job.waitForCompletion(true)? 0: 1 ); } catch ( Exception e ) { e.printStackTrace(); } } }
注: 借助maven的assembly插件, 生成胖jar包(就是把依赖的zookeeper和hbase jar包都打到该MapReduce包中), 否则的话, 就需要用户静态配置, 在Hadoop的class中添加zookeeper和hbase的配置文件和相关jar包.
最终的jar包为 bulk.jar, 主类名为cn.bd.batch.mr.CreateHfileByMapReduce, 生成HFile, 增量热载入hbase
sudo -u hdfs hadoop jar <xxoo>.jar <MainClass> <table_name> <data_input_path> <hfile_output_path>
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hfile_output_path> <table_name>
hadoop jar bulk.jar cn.bd.batch.mr.CreateHfileByMapReduce student /datas /user/output
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /user/output student
本文参考地址:http://www.cnblogs.com/mumuxinfei/p/3823367.html