这篇文章是看了HBase权威指南之后,根据上面的讲解搬下来的例子,但是稍微有些不一样。
HBase与mapreduce的集成无非就是mapreduce作业以HBase表作为输入,或者作为输出,也或者作为mapreduce作业之间共享数据的介质。
这篇文章将讲解两个例子:
1、读取存储在hdfs上的txt文本数据,简单地以json字符串的形式存储到HBase表中。
2、将第一步存储的HBase表中的json字符串读取出来,解析存储到新的HBase表中,可以进行查询。
本文详细给出了源码以及如何运行,旨在加深HBase与mapreduce集成的学习。
如果你还不知道怎么搭建基于HDFS的HBase单机环境,以及如何运行mapreduce任务,那么请先参考我这两篇文章:
(1) HBase环境搭建(一)Ubuntu下基于Hadoop文件系统的单机模式
(2) Hadoop基础学习(一)分析、编写并运行WordCount词频统计程序
1、读取存储在hdfs上的txt文本数据,简单地以json字符串的形式存储到HBase表中。
源码:
/** * @author 季义钦 * @date 2014-6 * @reference HBase权威指南 chapter7 * */ import java.io.IOException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class HdfsToHBase { private static final Log LOG = LogFactory.getLog(HdfsToHBase.class); public static final String NAME = "ImportFromFile"; public enum Counters { LINES } /** * Map类 * */ static class ImportMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> { private byte[] family = null; private byte[] qualifier = null; @Override protected void setup(Context context) throws IOException, InterruptedException { //获取通过Configuration传过来的列名 String columns = context.getConfiguration().get("conf.column"); //解析出列族和列的名称 byte[][] columnsBytes = KeyValue.parseColumn(Bytes.toBytes(columns)); family = columnsBytes[0]; qualifier = columnsBytes[1]; LOG.info("family:"+family.toString()+"qualifiers:"+qualifier); } @Override public void map(LongWritable offset, Text line, Context context) throws IOException { try { String lineStr = line.toString(); byte[] rowkey = DigestUtils.md5(lineStr); //构造Put对象 Put put = new Put(rowkey); put.add(family, qualifier, Bytes.toBytes(lineStr)); //发射Put对象 context.write(new ImmutableBytesWritable(rowkey), put); context.getCounter(Counters.LINES).increment(1); }catch(Exception e) { e.printStackTrace(); } } } /** * 将命令行参数解析为HBase的CommandLine对象 * @param args * @return * @throws ParseException */ private static CommandLine parseArgs(String[] args) throws ParseException { Options options = new Options(); Option o = new Option("t", "table", true, "table to import into (must exist)"); o.setArgName("table-name"); o.setRequired(true); options.addOption(o); o = new Option("c", "column", true, "column to store row data into (must exist)"); o.setArgName("family:qualifier"); o.setRequired(true); options.addOption(o); o = new Option("i", "input", true, "the directory or file to read from"); o.setArgName("path-in-HDFS"); o.setRequired(true); options.addOption(o); CommandLineParser parser = new PosixParser(); CommandLine cmd = null; try { cmd = parser.parse(options, args); } catch (Exception e) { System.err.println("ERROR: " + e.getMessage() + "\n"); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(NAME + " ", options, true); System.exit(-1); } return cmd; } /** * 主函数 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //将输入参数解析为CommandLine对象 Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); //取出各项参数 String tableName = cmd.getOptionValue("t"); String inputFileName = cmd.getOptionValue("i"); String columnName = cmd.getOptionValue("c"); conf.set("conf.column", columnName); Job job = new Job(conf, "Import from file " + inputFileName + " into table " + tableName); job.setJarByClass(HdfsToHBase.class); //设置map和reduce类 job.setMapperClass(ImportMapper.class); job.setNumReduceTasks(0); //设置map阶段输出的键值对类型 job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(Writable.class); //设置job输入输出格式 job.setOutputFormatClass(TableOutputFormat.class); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName); //设置输入输出路径 FileInputFormat.addInputPath(job, new Path(inputFileName)); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
引入的jar文件包括:
这是在eclispe中开发的,放在默认的包下面,导出为普通的jar文件。
然后利用命令start-all.sh和start-hbase.sh分别启动hadoop和HBase。
(1)首先登陆HBase shell,创建一个只包含一个列族的表:
(2)然后将txt数据上传到HDFS上面(数据在HBase权威指南随书的源码包中有)。
(3)然后执行job:
其中指定了main函数所在的类名,然后就分别是habse 表名,hdfs文件名,hbase表的列名。
作业执行完成之后可以到:http://localhost:50030/jobtracker.jsp 查看作业执行状态。
然后可以登陆hbase shell查看article表中有多少行数据,也可以用scan全部打印出来看。
2、将第一步存储的HBase表中的json字符串读取出来,解析存储到新的HBase表中,可以进行查询。
源码:
/** * @author 季义钦 * @date 2014-6 * @reference HBase权威指南 chapter7 * */ import java.io.IOException; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.commons.cli.PosixParser; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.GenericOptionsParser; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; public class HBaseToHBase { private static final Log LOG = LogFactory.getLog(HBaseToHBase.class); public static final String NAME = "HBaseToHBase"; public enum Counters { ROWS, COLS, ERROR, VALID } /** * Map类 * 以HBase表作为输入,所以继承自TableMapper * */ static class ParseMapper extends TableMapper<ImmutableBytesWritable, Writable> { private JSONParser parser = new JSONParser(); private byte[] family = null; @Override protected void setup(Context context) throws IOException, InterruptedException { family = Bytes.toBytes(context.getConfiguration().get("conf.family")); } @Override public void map(ImmutableBytesWritable rowKey, Result columns, Context context) throws IOException { String value = null; try { String author = "null"; Put put = new Put(rowKey.get()); //循环取得每一列(这里实际上只有一列存储json字符串) for(KeyValue kv:columns.list()) { context.getCounter(Counters.COLS).increment(1); value = Bytes.toStringBinary(kv.getValue()); //解析获取的json字符串 JSONObject json = (JSONObject)parser.parse(value); for(Object key : json.keySet()) { Object val = json.get(key); if(key.equals("author")) { author = val.toString(); } put.add(family, Bytes.toBytes(key.toString()), Bytes.toBytes(val.toString())); } } //以解析到的author作为行键发射出去 context.write(new ImmutableBytesWritable(Bytes.toBytes(author)), put); context.getCounter(Counters.VALID).increment(1); LOG.info("存储作者 "+author+"的数据完成!"); }catch(Exception e) { e.printStackTrace(); System.err.println("Error: " + e.getMessage() + ", Row: " + Bytes.toStringBinary(rowKey.get()) + ", JSON: " + value); context.getCounter(Counters.ERROR).increment(1); } } } /** * 解析命令行参数 * @param args * @return * @throws ParseException */ private static CommandLine parseArgs(String[] args) throws ParseException { Options options = new Options(); Option o = new Option("i", "input", true, "table to read from (must exist)"); o.setArgName("input-table-name"); o.setRequired(true); options.addOption(o); o = new Option("ic", "column", true, "column to read data from (must exist)"); o.setArgName("family:qualifier"); o.setRequired(true); options.addOption(o); o = new Option("o", "output", true, "table to write to (must exist)"); o.setArgName("output-table-name"); o.setRequired(true); options.addOption(o); o = new Option("oc", "family", true, "cf to write data to (must exist)"); o.setArgName("family"); o.setRequired(true); options.addOption(o); CommandLineParser parser = new PosixParser(); CommandLine cmd = null; try { cmd = parser.parse(options, args); } catch (Exception e) { System.err.println("ERROR: " + e.getMessage() + "\n"); HelpFormatter formatter = new HelpFormatter(); formatter.printHelp(NAME + " ", options, true); System.exit(-1); } return cmd; } /** * 主函数 * @param args */ public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); CommandLine cmd = parseArgs(otherArgs); String inputTable = cmd.getOptionValue("i"); //HBase源表 String outputTable = cmd.getOptionValue("o"); //HBase目标表 String inputColumn = cmd.getOptionValue("ic"); //HBase源表的列名 String outputColumnFamily = cmd.getOptionValue("oc"); //HBase目标表的列族名 conf.set("conf.family", outputColumnFamily); //提供Scan实例指定要扫描的列 Scan scan = new Scan(); byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(inputColumn)); scan.addColumn(colkey[0], colkey[1]); Job job = new Job(conf, "Parse data in " + inputTable + ", write to " + outputTable); job.setJarByClass(HBaseToHBase.class); //快速配置作业以HBase作为输入源和输出源 TableMapReduceUtil.initTableMapperJob(inputTable, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class, job); TableMapReduceUtil.initTableReducerJob(outputTable, IdentityTableReducer.class, job); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
注意:
(1)以HBase表作为mapreduce作业的输入时,一方面要继承字TableMapper类,一方面需要提供一个scan实例,指定要扫描来作为输入的记录。
(2)其中配置的Reduce是IdentityTableReducer,其作用和IdentityTableMapper一样,只是简单地将键值对传递到下一个阶段而已,没有什么实质性作用,它对于数据存储到HBase表中不是必须的,完全可以用另外一句话替代,即: setNumReduceTasks(0).
实际上作业执行的时候你应该也可以看到reduce一直是0%。
引入的jar文件包括:
(1)创建HBase表:
(2)导出jar包:
注意:里面引入了一个第三方的jar包,即simple json的jar包,用于解析json字符串。
simple json jar文件在这里下载:http://www.java2s.com/Code/Jar/j/Downloadjsonsimple111jar.htm
之前在一个网站下了一个山寨的,结果没有parse(string)这个接口,只有parse(Reader)这个接口,将String转换成StringReader传进去结果作业老是报错,坑死了。
引入第三方jar包执行Mapreduce作业的时候会报出classnotFound的异常,解决方法有以下几种:
1.把要依赖的包部署到每台tasktracker上面
这个方法最简单,但是要部署到每台tasktracker,而且可能引起包污染的问题。比如应用A和应用B都用到同一个libray,但是版本不同,就会出现冲突的问题。
2.把依赖的包和直接合并到mapreducejob的包
这个方法的问题是合并后的包可能非常大,也不利于的包的升级
3.使用DistributedCache
这个方法就是先把这些包上传到HDFS,可以在程序启动的时候做一次。然后在submitjob的时候把hdfspath加到classpath里面。
示例:
$bin/hadoop fs -copyFromLocal ib/protobuf-java-2.0.3.jar/myapp/protobuf-java-2.0.3.jar //Setup the application‘s JobConf:JobConf job = new JobConf(); DistributedCache.addFileToClassPath(newPath("/myapp/protobuf-java-2.0.3.jar"),
job);
4,还有一种情况是扩展包特别多的情况下用3就不爽了,参考一下:
《Hadoop权威指南》中也有关于jar打包的处理措施,查找之
【任何非独立的JAR文件都必须打包到JAR文件的lib目录中。(这与Java的webapplication
archive或WAR文件类似,不同的是,后者的JAR文件放在WEB-INF/lib子目录下的WAR文件中)】
我采用的是第四种方法,在工程下面创建一个lib文件夹,将json-simple-1.1.1.jar放进去:
然后export:
(3)执行job:
OK了,下面就可以用hbase shell登陆,并用scan ‘authorTable’查看解析进去的数据了。
HBase概念学习(七)HBase与Mapreduce集成,布布扣,bubuko.com