HBase概念学习(七)HBase与Mapreduce集成

这篇文章是看了HBase权威指南之后,根据上面的讲解搬下来的例子,但是稍微有些不一样。

HBase与mapreduce的集成无非就是mapreduce作业以HBase表作为输入,或者作为输出,也或者作为mapreduce作业之间共享数据的介质。

这篇文章将讲解两个例子:

1、读取存储在hdfs上的txt文本数据,简单地以json字符串的形式存储到HBase表中。

2、将第一步存储的HBase表中的json字符串读取出来,解析存储到新的HBase表中,可以进行查询。

本文详细给出了源码以及如何运行,旨在加深HBase与mapreduce集成的学习。

如果你还不知道怎么搭建基于HDFS的HBase单机环境,以及如何运行mapreduce任务,那么请先参考我这两篇文章:

(1) HBase环境搭建(一)Ubuntu下基于Hadoop文件系统的单机模式

(2) Hadoop基础学习(一)分析、编写并运行WordCount词频统计程序

1、读取存储在hdfs上的txt文本数据,简单地以json字符串的形式存储到HBase表中。

源码:

/**
 * @author 季义钦
 * @date 2014-6
 * @reference HBase权威指南 chapter7
 *
 */

import java.io.IOException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class HdfsToHBase
{
	private static final Log LOG = LogFactory.getLog(HdfsToHBase.class);
	public static final String NAME = "ImportFromFile";
	public enum Counters { LINES }

	/**
	 * Map类
	 *
	 */
	static class ImportMapper
	extends Mapper<LongWritable, Text, ImmutableBytesWritable, Writable>
	{
		private byte[] family = null;
		private byte[] qualifier = null;

		@Override
		protected void setup(Context context) throws IOException, InterruptedException
		{
			//获取通过Configuration传过来的列名
			String columns = context.getConfiguration().get("conf.column");

			//解析出列族和列的名称
			byte[][] columnsBytes = KeyValue.parseColumn(Bytes.toBytes(columns));
			family = columnsBytes[0];
			qualifier = columnsBytes[1];

			LOG.info("family:"+family.toString()+"qualifiers:"+qualifier);
		}

		@Override
		public void map(LongWritable offset, Text line, Context context) throws IOException
		{
			try
			{
				String lineStr = line.toString();
				byte[] rowkey = DigestUtils.md5(lineStr);

				//构造Put对象
				Put put = new Put(rowkey);
				put.add(family, qualifier, Bytes.toBytes(lineStr));

				//发射Put对象
				context.write(new ImmutableBytesWritable(rowkey), put);
				context.getCounter(Counters.LINES).increment(1);

			}catch(Exception e)
			{
				e.printStackTrace();
			}
		}

	}

	/**
	 * 将命令行参数解析为HBase的CommandLine对象
	 * @param args
	 * @return
	 * @throws ParseException
	 */
	private static CommandLine parseArgs(String[] args) throws ParseException
	{
		Options options = new Options();
	    Option o = new Option("t", "table", true, "table to import into (must exist)");
	    o.setArgName("table-name");
	    o.setRequired(true);
	    options.addOption(o);

	    o = new Option("c", "column", true, "column to store row data into (must exist)");
	    o.setArgName("family:qualifier");
	    o.setRequired(true);
	    options.addOption(o);

	    o = new Option("i", "input", true, "the directory or file to read from");
	    o.setArgName("path-in-HDFS");
	    o.setRequired(true);
	    options.addOption(o);

	    CommandLineParser parser = new PosixParser();
	    CommandLine cmd = null;

	    try
	    {
	        cmd = parser.parse(options, args);
	    } catch (Exception e) {
	        System.err.println("ERROR: " + e.getMessage() + "\n");
	        HelpFormatter formatter = new HelpFormatter();
	        formatter.printHelp(NAME + " ", options, true);
	        System.exit(-1);
	    }

	    return cmd;
	}

	/**
	 * 主函数
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception
	{
		//将输入参数解析为CommandLine对象
		Configuration conf = HBaseConfiguration.create();
	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    CommandLine cmd = parseArgs(otherArgs);

	    //取出各项参数
	    String tableName = cmd.getOptionValue("t");
	    String inputFileName = cmd.getOptionValue("i");
	    String columnName = cmd.getOptionValue("c");
	    conf.set("conf.column", columnName);

	    Job job = new Job(conf, "Import from file " + inputFileName + " into table " + tableName);
	    job.setJarByClass(HdfsToHBase.class);

	    //设置map和reduce类
	    job.setMapperClass(ImportMapper.class);
	    job.setNumReduceTasks(0);

	    //设置map阶段输出的键值对类型
	    job.setOutputKeyClass(ImmutableBytesWritable.class);
	    job.setOutputValueClass(Writable.class);

	    //设置job输入输出格式
	    job.setOutputFormatClass(TableOutputFormat.class);
	    job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, tableName);	    

	    //设置输入输出路径
	    FileInputFormat.addInputPath(job, new Path(inputFileName));

	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

引入的jar文件包括:

这是在eclispe中开发的,放在默认的包下面,导出为普通的jar文件。

然后利用命令start-all.sh和start-hbase.sh分别启动hadoop和HBase。

(1)首先登陆HBase shell,创建一个只包含一个列族的表

(2)然后将txt数据上传到HDFS上面(数据在HBase权威指南随书的源码包中有)。

(3)然后执行job:

其中指定了main函数所在的类名,然后就分别是habse 表名,hdfs文件名,hbase表的列名。

作业执行完成之后可以到:http://localhost:50030/jobtracker.jsp 查看作业执行状态。

然后可以登陆hbase shell查看article表中有多少行数据,也可以用scan全部打印出来看。

2、将第一步存储的HBase表中的json字符串读取出来,解析存储到新的HBase表中,可以进行查询。

源码:

/**
 * @author 季义钦
 * @date 2014-6
 * @reference HBase权威指南 chapter7
 *
 */
import java.io.IOException;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.IdentityTableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;

public class HBaseToHBase
{
	private static final Log LOG = LogFactory.getLog(HBaseToHBase.class);
	public static final String NAME = "HBaseToHBase";
	public enum Counters { ROWS, COLS, ERROR, VALID }

	/**
	 * Map类
	 * 以HBase表作为输入,所以继承自TableMapper
	 *
	 */
	static class ParseMapper
	extends TableMapper<ImmutableBytesWritable, Writable>
	{
		private JSONParser parser = new JSONParser();
		private byte[] family = null;

		@Override
		protected void setup(Context context) throws IOException, InterruptedException
		{
			family = Bytes.toBytes(context.getConfiguration().get("conf.family"));
		}

		@Override
		public void map(ImmutableBytesWritable rowKey, Result columns, Context context) throws IOException
		{
			String value = null;		

			try
			{
				String author = "null";
				Put put = new Put(rowKey.get());

				//循环取得每一列(这里实际上只有一列存储json字符串)
				for(KeyValue kv:columns.list())
				{
					context.getCounter(Counters.COLS).increment(1);
					value = Bytes.toStringBinary(kv.getValue());

					//解析获取的json字符串
					JSONObject json = (JSONObject)parser.parse(value);
					for(Object key : json.keySet())
					{
						Object val = json.get(key);
						if(key.equals("author"))
						{
							author = val.toString();
						}

						put.add(family, Bytes.toBytes(key.toString()), Bytes.toBytes(val.toString()));
					}
				}

				//以解析到的author作为行键发射出去
				context.write(new ImmutableBytesWritable(Bytes.toBytes(author)), put);
				context.getCounter(Counters.VALID).increment(1);
				LOG.info("存储作者 "+author+"的数据完成!");
			}catch(Exception e)
			{
				e.printStackTrace();
				System.err.println("Error: " + e.getMessage() + ", Row: " +
				          Bytes.toStringBinary(rowKey.get()) + ", JSON: " + value);
				        context.getCounter(Counters.ERROR).increment(1);
			}
		}
	}

	/**
	 * 解析命令行参数
	 * @param args
	 * @return
	 * @throws ParseException
	 */
	private static CommandLine parseArgs(String[] args) throws ParseException
	{
		Options options = new Options();

		Option o = new Option("i", "input", true, "table to read from (must exist)");
		o.setArgName("input-table-name");
		o.setRequired(true);
		options.addOption(o);

	    o = new Option("ic", "column", true, "column to read data from (must exist)");
	    o.setArgName("family:qualifier");
	    o.setRequired(true);
	    options.addOption(o);

	  	o = new Option("o", "output", true, "table to write to (must exist)");
	  	o.setArgName("output-table-name");
	  	o.setRequired(true);
	  	options.addOption(o);

	    o = new Option("oc", "family", true, "cf to write data to (must exist)");
	    o.setArgName("family");
	    o.setRequired(true);
	    options.addOption(o);

	    CommandLineParser parser = new PosixParser();
	    CommandLine cmd = null;
	    try
	    {
	      cmd = parser.parse(options, args);
	    }
	    catch (Exception e)
	    {
	      System.err.println("ERROR: " + e.getMessage() + "\n");
	      HelpFormatter formatter = new HelpFormatter();
	      formatter.printHelp(NAME + " ", options, true);
	      System.exit(-1);
	    }
	    return cmd;
	  }

	/**
	 * 主函数
	 * @param args
	 */
	public static void main(String[] args) throws Exception
	{
	    Configuration conf = HBaseConfiguration.create();
	    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
	    CommandLine cmd = parseArgs(otherArgs);

	    String inputTable = cmd.getOptionValue("i");	//HBase源表
	    String outputTable = cmd.getOptionValue("o");	//HBase目标表
	    String inputColumn = cmd.getOptionValue("ic");	//HBase源表的列名
	    String outputColumnFamily = cmd.getOptionValue("oc");	//HBase目标表的列族名
	    conf.set("conf.family", outputColumnFamily);    

	    //提供Scan实例指定要扫描的列
	    Scan scan = new Scan();
	    byte[][] colkey = KeyValue.parseColumn(Bytes.toBytes(inputColumn));
	    scan.addColumn(colkey[0], colkey[1]);	        

	    Job job = new Job(conf, "Parse data in " + inputTable + ", write to " + outputTable);
	    job.setJarByClass(HBaseToHBase.class);  

	    //快速配置作业以HBase作为输入源和输出源
	    TableMapReduceUtil.initTableMapperJob(inputTable, scan, ParseMapper.class, ImmutableBytesWritable.class, Put.class, job);
	    TableMapReduceUtil.initTableReducerJob(outputTable, IdentityTableReducer.class, job);

	    System.exit(job.waitForCompletion(true) ? 0 : 1);
	}

}

注意:

(1)以HBase表作为mapreduce作业的输入时,一方面要继承字TableMapper类,一方面需要提供一个scan实例,指定要扫描来作为输入的记录。

(2)其中配置的Reduce是IdentityTableReducer,其作用和IdentityTableMapper一样,只是简单地将键值对传递到下一个阶段而已,没有什么实质性作用,它对于数据存储到HBase表中不是必须的,完全可以用另外一句话替代,即: setNumReduceTasks(0).

实际上作业执行的时候你应该也可以看到reduce一直是0%。

引入的jar文件包括:

(1)创建HBase表:

(2)导出jar包:

注意:里面引入了一个第三方的jar包,即simple json的jar包,用于解析json字符串。

simple json jar文件在这里下载:http://www.java2s.com/Code/Jar/j/Downloadjsonsimple111jar.htm

之前在一个网站下了一个山寨的,结果没有parse(string)这个接口,只有parse(Reader)这个接口,将String转换成StringReader传进去结果作业老是报错,坑死了。

引入第三方jar包执行Mapreduce作业的时候会报出classnotFound的异常,解决方法有以下几种:

1.把要依赖的包部署到每台tasktracker上面

这个方法最简单,但是要部署到每台tasktracker,而且可能引起包污染的问题。比如应用A和应用B都用到同一个libray,但是版本不同,就会出现冲突的问题。

2.把依赖的包和直接合并到mapreducejob的包

这个方法的问题是合并后的包可能非常大,也不利于的包的升级

3.使用DistributedCache

这个方法就是先把这些包上传到HDFS,可以在程序启动的时候做一次。然后在submitjob的时候把hdfspath加到classpath里面。

示例:

$bin/hadoop fs -copyFromLocal ib/protobuf-java-2.0.3.jar/myapp/protobuf-java-2.0.3.jar //Setup the application‘s JobConf:JobConf job = new JobConf(); DistributedCache.addFileToClassPath(newPath("/myapp/protobuf-java-2.0.3.jar"),
job);

4,还有一种情况是扩展包特别多的情况下用3就不爽了,参考一下:

《Hadoop权威指南》中也有关于jar打包的处理措施,查找之

【任何非独立的JAR文件都必须打包到JAR文件的lib目录中。(这与Java的webapplication
archive或WAR文件类似,不同的是,后者的JAR文件放在WEB-INF/lib子目录下的WAR文件中)】

我采用的是第四种方法,在工程下面创建一个lib文件夹,将json-simple-1.1.1.jar放进去:

然后export:

(3)执行job:

OK了,下面就可以用hbase shell登陆,并用scan ‘authorTable’查看解析进去的数据了。

HBase概念学习(七)HBase与Mapreduce集成,布布扣,bubuko.com

时间: 2025-01-02 01:11:34

HBase概念学习(七)HBase与Mapreduce集成的相关文章

HBase概念学习(八)开发一个类twitter系统之表设计

这边文章先将可能的需求分析一下,设计出HBase表,下一步再开始编写客户端代码. TwiBase系统 1.背景 为了加深HBase基本概念的学习,参考HBase实战这本书实际动手做了这个例子. 2.需求 这是一个用户推特系统,用户登陆到系统,需要维护用户的基本信息,然后用户可以发帖和其他用户进行互动.用户之间可以相互关注,用户可以浏览关注用户的推文等等. 这是一个比较简单的推特系统,不考虑用户之间的私信,用户评论推特等功能. 3.概要设计 3.1表设计 首先需要设计三个表:用户表,推特表以及用户

Hbase框架原理及相关的知识点理解、Hbase访问MapReduce、Hbase访问Java API、Hbase shell及Hbase性能优化总结

转自:http://blog.csdn.net/zhongwen7710/article/details/39577431 本blog的内容包含: 第一部分:Hbase框架原理理解 第二部分:Hbase调用MapReduce函数使用理解 第三部分:Hbase调用Java API使用理解 第四部分:Hbase Shell操作 第五部分:Hbase建表.读写操作方式性能优化总结 第一部分:Hbase框架原理理解 概述 HBase是一个构建在HDFS上的分布式列存储系统:HBase是基于Google

HBase概念学习(九)HTablePool为何弃用?

转载请注明出处:jiq?钦's technical Blog 我们先看HConnection的getTable方法描述: getTable HTableInterface getTable(String tableName) throws IOException Retrieve an HTableInterface implementation for access to a table. The returned HTableInterface is not thread safe, a n

HBase概念学习(三)Java API之扫描和过滤器

HBase基本的CRUD操作就不多介绍了,无非就是Put,Get,Delete三个类的运用. 本文相当于是阅读HBase权威指南的总结. 一.扫描(Scan) 现在看一下扫描技术,这种技术类似于关系型数据库的游标(cursor),并利用到了HBase底层顺序存储的特性. 使用扫描的一般步骤是: 1.创建Scan实例 2.为Scan实例增加扫描的限制条件 3.调用HTable的getScanner()方法获取ResultScanner对象 4.迭代ResultScanner对象中的Result对象

HBase 实战(1)--HBase的数据导入方式

前言: 作为Hadoop生态系统中重要的一员, HBase作为分布式列式存储, 在线实时处理的特性, 备受瞩目, 将来能在很多应用场景, 取代传统关系型数据库的江湖地位. 本篇博文重点讲解HBase的数据导入, 描述三种方式, Client API, Bulkload, 以及Hive Over HBase. *). Client API实现借助HBase的Client API来导入, 是最简易学的方式. Configuration config = HBaseConfiguration.crea

java开发:分享一下百度ueditor和七牛的图片集成上传

做网站时,如果上传的图片量很大,现在不少人会选用七牛图片服务器.那么,今天就来说说如何把网站的图片上传与七牛的sdk集成的问题. jsp页面,实现图片上传的方式也很多,今天就来说下百度的编辑器:ueditor 首先要到官网去下载它,后面我也会附上源代码,需要的朋友可以下载. 我们新建一个项目:qndemo,然后将ueditor放到webroot目录下,截图如下: 另外,我们还要引入jar包: 前台页面,我们需要引用相关js,默认配置下,会加载出编辑效果,如下图: 这时候,我们上传的图片会保存在本

[转]HBase hbck——检察HBase集群的一致性

Hbase提供了hbck命令来检查各种不一致问题.hbck的名字仿效了HDFS的fsck命令,后者是一个用于检查HDFS中不一致问题的工具.下面这段非常易懂的介绍出自于hbck的源程序. 检查数据在Master及RegionServer的内存中状态与数据在HDFS中的状态之间的一致性. HBase的hbck不仅能够检查不一致问题,而且还能够修复不一致问题. 在生产环境中,应当经常运行hbck,以便及早发现不一致问题并更容易地解决问题. 一.问题 首先,在HBase上创建一张表usertable.

HBase案例:HBase 在人工智能场景的使用

近几年来,人工智能逐渐火热起来,特别是和大数据一起结合使用.人工智能的主要场景又包括图像能力.语音能力.自然语言处理能力和用户画像能力等等.这些场景我们都需要处理海量的数据,处理完的数据一般都需要存储起来,这些数据的特点主要有如下几点: 大:数据量越大,对我们后面建模越会有好处: 稀疏:每行数据可能拥有不同的属性,比如用户画像数据,每个人拥有属性相差很大,可能用户A拥有这个属性,但是用户B没有这个属性:那么我们希望存储的系统能够处理这种情况,没有的属性在底层不占用空间,这样可以节约大量的空间使用

HBase概念学习(四)Java API之扫描和过滤器

HBase主要的CRUD操作就不多介绍了,无非就是Put,Get.Delete三个类的运用. 本文相当于是阅读HBase权威指南的总结. 一.扫描(Scan) 如今看一下扫描技术,这样的技术类似于关系型数据库的游标(cursor),并利用到了HBase底层顺序存储的特性. 使用扫描的一般步骤是: 1.创建Scan实例 2.为Scan实例添加扫描的限制条件 3.调用HTable的getScanner()方法获取ResultScanner对象,假设通过HTablePool的方式,则是调用HTable