HBase with MapReduce (Summary)

我们知道,hbase没有像关系型的数据库拥有强大的查询功能和统计功能,本文实现了如何利用mapreduce来统计hbase中单元值出现的个数,并将结果携带目标的表中,

(1)mapper的实现

package com.datacenter.HbaseMapReduce.Summary;

import java.io.IOException;
import java.util.NavigableMap;
import java.util.Map.Entry;

import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class SummaryMapper extends TableMapper<Text, IntWritable> { // 这里是指定map中context输出的类型

	public static final byte[] CF = "cf".getBytes();
	public static final byte[] ATTR1 = "attr1".getBytes();

	private final IntWritable ONE = new IntWritable(1);
	private Text text = new Text();

	@Override
	protected void map(ImmutableBytesWritable key, Result value, Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub

/*		byte[] ss = value.getValue(CF, ATTR1); // 这里是只是获取特定的列族,特定列的值的个数,也可以根据实际的情况修改
		String val = new String(ss);
		text.set(val); // we can only emit Writables..
		context.write(text, ONE);*/

		//统计所有的列族和列的值的个数
		try {
			DealResult( value , context);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	// 统计所有列族和列的值的个数
	public void DealResult(Result rs ,Context context) throws Exception {

		if (rs.isEmpty()) {
			System.out.println("result is empty!");
			return;
		}

		NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> tableResulrt = rs
				.getMap();
		String rowkey = Bytes.toString(rs.getRow()); // actain rowkey
		///System.out.println("rowkey->" + rowkey);
		for (Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> familyResult : tableResulrt
				.entrySet()) {
			//System.out.print("\tfamily->" + Bytes.toString(temp.getKey()));
			for (Entry<byte[], NavigableMap<Long, byte[]>> columnResult : familyResult
					.getValue().entrySet()) {
				///System.out.print("\tcol->" + Bytes.toString(value.getKey()));
				for (Entry<Long, byte[]> valueResult : columnResult.getValue().entrySet()) {
					//System.out.print("\tvesion->" + va.getKey());
					//System.out.print("\tvalue->"+ Bytes.toString(va.getValue()));
					//System.out.println();
					text.set(new String(valueResult.getValue()));
					context.write(text, ONE);
				}
			}
		}
	}

}

(2)reduce的实现

package com.datacenter.HbaseMapReduce.Summary;

import java.io.IOException;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class SummaryReducer extends
		TableReducer<Text, IntWritable, ImmutableBytesWritable> {

	public static final byte[] CF = "cf".getBytes();
	public static final byte[] COUNT = "count".getBytes();

	@SuppressWarnings("deprecation")
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values, Context context)
			throws IOException, InterruptedException {
		// TODO Auto-generated method stub
		int i = 0;
		for (IntWritable val : values) {
			i += val.get();
		}
		Put put = new Put(Bytes.toBytes(key.toString()));
		//Cell s=new
		put.add(CF, COUNT, 100,Bytes.toBytes(i));  //在对应的列族中增加一列count,记录其个数

		context.write(null, put);
	}

}

(3)主类加载信息的实现

package com.datacenter.HbaseMapReduce.Summary;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

//统计hbase表中,每行的值在整个表的个数

public class SummaryMain {
	static String rootdir = "hdfs://hadoop3:8020/hbase";
	static String zkServer = "hadoop3";
	static String port = "2181";

	private static Configuration conf;
	private static HConnection hConn = null;

	public static void HbaseUtil(String rootDir, String zkServer, String port) {

		conf = HBaseConfiguration.create();// 获取默认配置信息
		conf.set("hbase.rootdir", rootDir);
		conf.set("hbase.zookeeper.quorum", zkServer);
		conf.set("hbase.zookeeper.property.clientPort", port);

		try {
			hConn = HConnectionManager.createConnection(conf);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public static void main(String[] args) throws Exception {
		// TODO Auto-generated method stub
		HbaseUtil(rootdir, zkServer, port);

		Job job = new Job(conf, "ExampleSummary");
		job.setJarByClass(SummaryMain.class); // class that contains mapper and
												// reducer

		Scan scan = new Scan();
		scan.setCaching(500); // 1 is the default in Scan, which will be bad for
								// MapReduce jobs
		scan.setCacheBlocks(false); // don‘t set to true for MR jobs
		// set other scan attrs

		TableMapReduceUtil.initTableMapperJob("score", // input table
				scan, // Scan instance to control CF and attribute selection
				SummaryMapper.class, // mapper class
				Text.class, // mapper output key
				IntWritable.class, // mapper output value
				job);
		TableMapReduceUtil.initTableReducerJob("test", // output table
				SummaryReducer.class, // reducer class
				job);
		job.setNumReduceTasks(1); // at least one, adjust as required

		boolean b = job.waitForCompletion(true);
		if (!b) {
			throw new IOException("error with job!");
		}

	}

}
时间: 2024-08-10 19:20:27

HBase with MapReduce (Summary)的相关文章

HBase with MapReduce (SummaryToFile)

上一篇文章是实现统计hbase单元值出现的个数,并将结果存放到hbase的表中,本文是将结果存放到hdfs上.其中的map实现与前文一直,连接:http://www.cnblogs.com/ljy2013/p/4820056.html,下面主要介绍一下reduce的实现: (1)reduce的实现 package com.datacenter.HbaseMapReduce.SummaryToFile; import java.io.IOException; import org.apache.h

深入HBase架构解析(二)【转】

转自:http://www.blogjava.net/DLevin/archive/2015/08/22/426950.html 前言 这是<深入HBase架构解析(一)>的续,不多废话,继续.... HBase读的实现 通过前文的描述,我们知道在HBase写时,相同Cell(RowKey/ColumnFamily/Column相同)并不保证在一起,甚至删除一个Cell也只是写入一个新的Cell,它含有Delete标记,而不一定将一个Cell真正删除了,因而这就引起了一个问题,如何实现读的问题

[转]毕设- 深入HBase架构解析(二)

深入HBase架构解析(二) 前言 这是<深入HBase架构解析(一)>的续,不多废话,继续.... HBase读的实现 通过前文的描述,我们知道在HBase写时,相同Cell(RowKey/ColumnFamily/Column相同)并不保证在一起,甚至删除一个Cell也只是写入一个新的Cell,它含有Delete标记,而不一定将一个Cell真正删除了,因而这就引起了一个问题,如何实现读的问题?要解决这个问题,我们先来分析一下相同的Cell可能存在的位置:首先对新写入的Cell,它会存在于M

Hbase 技术细节笔记(上)

欢迎大家前往腾讯云技术社区,获取更多腾讯海量技术实践干货哦~ 作者:张秀云 前言 最近在跟进Hbase的相关工作,由于之前对Hbase并不怎么了解,因此系统地学习了下Hbase,为了加深对Hbase的理解,对相关知识点做了笔记,并在组内进行了Hbase相关技术的分享,由于Hbase涵盖的内容比较多,因此计划分享2期,下面就是针对第一期Hbase技术分享整体而成,第一期的主要内容如下: 一.Hbase介绍二.Hbase的Region介绍三.Hbase的写逻辑介绍四.Hbase的故障恢复五.Hbas

HBase框架基础(三)

* HBase框架基础(三) 本节我们继续讨论HBase的一些开发常识,以及HBase与其他框架协调使用的方式.在开始之前,为了框架之间更好的适配,以及复习之前HBase的配置操作,请使用cdh版本的HBase开启动相关服务,记得,配置HMaster的HA. 为了方便,cdh版本hbase下载传送门: 链接:http://pan.baidu.com/s/1dFsyakT 密码:xji7,相关配置请参考HBase框架基础(一) * HBase的数据迁移 原因:我们需要问一个问题,何时,HBase的

HBase框架基础(二)

* HBase框架基础(二) 上一节我们了解了HBase的架构原理和模块组成,这一节我们先来聊一聊HBase的读写数据的过程. * HBase的读写流程及3个机制 HBase的读数据流程: 1.HRegionServer保存着meta表以及表数据,要访问表数据,首先Client先去访问zookeeper,从zookeeper里面获取meta表所在的位置信息,即找到这个meta表在哪个HRegionServer上保存着. 2.接着Client通过刚才获取到的HRegionServer的IP来访问M

HBase框架基础(四)

* HBase框架基础(四) 上一节我们介绍了如何使用HBase搞一些MapReduce小程序,其主要作用呢是可以做一些数据清洗和分析或者导入数据的工作,这一节我们来介绍如何使用HBase与其他框架进行搭配使用. * HBase与Hive 在开始HBase与Hive搭配使用前,我们复习一下这两个框架的特点: Hive: ** 数据仓库 ** 用于数据分析,数据清洗等等 ** 基于MapReduce ** 延迟高,离线使用 HBase: ** 面向列存储的非关系型数据库 ** 存储数据 ** 基于

HBase笔记整理(一)

[TOC] HBase笔记整理(一) 行列式数据库 行式数据库: 可以简单的理解为类似传统的rdbmspaint这些数据,存放的数据都是结构化的数据. 行式数据库,是有利于全表数据的扫描,不利于只查询个别字段 列式数据库: 对行式数据库的一个改进,将部分列(或者说有关联的一些列)存放到单独的文件中,其他列存在其它多个文件中, 我们在进行查询的时候,只需要读取出这些常用列即可完成工作,这样,减少了文件IO的读写,提高读写的效率( 不用再想行式数据库进行全表扫描,然后过滤相关字段) 在行式数据库里面

[转]毕设- 深入HBase架构解析(一)

深入HBase架构解析(一) 前记 公司内部使用的是MapR版本的Hadoop生态系统,因而从MapR的官网看到了这篇文文章:An In-Depth Look at the HBase Architecture,原本想翻译全文,然而如果翻译就需要各种咬文嚼字,太麻烦,因而本文大部分使用了自己的语言,并且加入了其他资源的参考理解以及本人自己读源码时对其的理解,属于半翻译.半原创吧. HBase架构组成 HBase采用Master/Slave架构搭建集群,它隶属于Hadoop生态系统,由一下类型节点