hbase1

http://blog.csdn.net/anhuidelinger/article/details/16989771

import java.io.IOException;
import java.util.HashMap;  

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.util.Base64;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.GenericOptionsParser;  

public class IndexBuilder {
	// 索引的列族为info ,列为name
	public static final byte[] column = Bytes.toBytes("info");
	public static final byte[] qualifier = Bytes.toBytes("name");

	public static class Map extends Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {
		private byte[] family;
		// indexes 存储了列与表对对应关系,其中byte[]用于获取列的值作为索引表的键值,ImmutableBytesWritable作为表的名称
		private HashMap<byte[], ImmutableBytesWritable> indexes;
		// 在Map中,对每一行的数据提取出需要建立索引的列的值,加入到索引表中输出。
		protected void map(ImmutableBytesWritable rowKey, Result result,Context context) throws IOException, InterruptedException {
			// original: row 123 attribute:phone 555-1212
	        // index: row 555-1212 INDEX:ROW 123
			for (java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes.entrySet()) {
				// 获得列名 qualifier
				byte[] qualifier = index.getKey();
				// 索引表名 table
				ImmutableBytesWritable table = index.getValue();
				// 插入的列值为 列名加上行名
				byte[] value = result.getValue(family, qualifier);
				// 以列值作为行键,在列“info:row”中插入行键
				Put put = new Put(value);
				put.add(column, qualifier, rowKey.get());
				// 在table表上执行put操作
				context.write(table, (Writable) put);
			}
		}

		protected void setup(Context context) throws IOException,InterruptedException {
			Configuration configuration = context.getConfiguration();
			String table = configuration.get("index.tablename");
			String[] fields = configuration.getStrings("index.fields");
			String familyName = configuration.get("index.familyname");
			family = Bytes.toBytes(familyName);

			// 初始化indexes
			// if the table is "people" and the field to index is "email", then the
	        // index table will be called "people-email"
			indexes = new HashMap<byte[], ImmutableBytesWritable>();
			for (String field : fields) {
				indexes.put(Bytes.toBytes(field), new ImmutableBytesWritable(
						Bytes.toBytes(table + "-" + field)));
			}
		}
	}
	// Job configuration
	public static Job configureJob(Configuration conf, String[] args)throws IOException {
		String table = args[0];
		String columnFamily = args[1];
		System.out.println(table);
		// 通过Configuration.set()方法传递参数
		conf.set(TableInputFormat.SCAN, ScanToString(new Scan()));
		conf.set(TableInputFormat.INPUT_TABLE, table);
		conf.set("index.tablename", table);
		conf.set("index.familyname", columnFamily);
		String[] fields = new String[args.length - 2];
		for (int i = 0; i < fields.length; i++) {
			fields[i] = args[i + 2];
		}
		conf.setStrings("index.fields", fields);
		conf.set("index.familyname", "attributes");
		// 运行参数配置
		Job job = new Job(conf, table);
		job.setJarByClass(IndexBuilder.class);
		job.setMapperClass(Map.class);
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TableInputFormat.class);
		job.setOutputFormatClass(MultiTableOutputFormat.class);
		return job;
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		if (otherArgs.length < 3)System.exit(-1);
		Job job = configureJob(conf, otherArgs);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
	static String ScanToString(Scan scan) throws IOException {
	    ClientProtos.Scan proto = ProtobufUtil.toScan(scan);
	    return Base64.encodeBytes(proto.toByteArray());
	}
}

  

时间: 2024-11-09 10:34:28

hbase1的相关文章

HBase-1.0.1学习笔记汇总

欢迎访问:鲁春利的工作笔记,学习是一种信仰,让时间考验坚持的力量. HBase-1.0.1学习笔记(一)集群搭建 http://luchunli.blog.51cto.com/2368057/1682049HBase-1.0.1学习笔记(二)客户端访问 http://luchunli.blog.51cto.com/2368057/1687458 HBase-1.0.1学习笔记(三)启动脚本解析 http://luchunli.blog.51cto.com/2368057/1690619 HBas

Hbase-1.2.4 javaAPI实现简单的类CRUD操作

一. 概述 关于Hbase的工作原理网上已经有很多详细介绍,就不在这里赘述了,我们直接研究代码 作为Hbase最新stable版,Hbase内部引入的部分hadoop2.5系列的jar包,这里且暂不去理会它,但是Java环境必须要8以上,我的正确运行环境为: Centos-6.5 hadoop-2.6.5 Hbase-1.2.4 jdk1.8 二.代码设计 package com.unisk.bigdata.hbase;import java.io.IOException;import org.

hadoop: hbase1.0.1.1 伪分布安装

环境:hadoop 2.6.0 + hbase 1.0.1.1 + mac OS X yosemite 10.10.3 安装步骤: 一.下载解压 到官网 http://hbase.apache.org 下载合适的版本(hbase与hadoop的版本要匹配,否则可能运行不起来),解压到某个目录即可,本文中的解压目录为 /Users/jimmy/app/hbase-1.0.1.1 二.设置环境变量 ... export HBASE_HOME=/Users/jimmy/app/hbase-1.0.1.

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二)

HBase1.0.0源码分析之请求处理流程分析以Put操作为例(二) 1.通过mutate(put)操作,将单个put操作添加到缓冲操作中,这些缓冲操作其实就是Put的父类的一个List的集合.如下: private List<Row> writeAsyncBuffer = new LinkedList<>(); writeAsyncBuffer.add(m); 当writeAsyncBuffer满了之后或者是人为的调用backgroundFlushCommits操作促使缓冲池中的

HBase1.0以上版本号的API改变

HBase1.0以上版本号已经废弃了 HTableInterface,HTable,HBaseAdmin等API的使用.新增了一些API来实现之前的功能: Connectioninterface: Connection connection = ConnectionFactory.createConnection(config); // ... connection.close(); TableName class: String tableName = "Table"; TableN

最新Hadoop-2.7.2+hbase-1.2.0+zookeeper-3.4.8 HA高可用集群配置安装

Ip 主机名 程序 进程 192.168.128.11 h1 Jdk Hadoop hbase Namenode DFSZKFailoverController Hamster 192.168.128.12 h2 Jdk Hadoop hbase Namenode DFSZKFailoverController Hamster 192.168.128.13 h3 Jdk Hadoop resourceManager 192.168.128.14 h4 Jdk Hadoop resourceMan

hadoop2.6.0 + hbase-1.0.0 伪分布配置

1 基本配置 主机名: 192.168.145.154 hadoop2 ======= 2 etc/hadoop下文件配置 1)core-site.xml <configuration> <property> <name>fs.defaultFS</name> <value>hdfs://hadoop2:8020</value> </property> <property> <name>io.fil

VMware12使用三台虚拟机Ubuntu16.04系统搭建hadoop-2.7.1+hbase-1.2.4(完全分布式)

初衷 首先说明一下既然网上有那么多教程为什么要还要写这样一个安装教程呢?网上教程虽然多,但是有些教程比较老,许多教程忽略许多安装过程中的细节,比如添加用户的权限,文件权限,小编在安装过程遇到许多这样的问题所以想写一篇完整的教程,希望对初学hadoop的人有一个直观的了解,我们接触真集群的机会比较少,虚拟机是个不错的选择,可以基本完全模拟真实的情况,前提是你的电脑要配置相对较好不然跑起来都想死,废话不多说. 环境说明 本文使用VMware? Workstation 12 Pro虚拟机创建并安装三台

CentOS7搭建hadoop2.6.4+HBase1.1.6

环境: CentOS7 hadoop2.6.4两个节点:master.slave1 HBase1.1.6 过程: hadoop安装目录:/usr/hadoop-2.6.4 master节点,hadoop用户登录.在hadoop目录下新建thirdparty目录: $ mkdir thirdparty 目录结构: 把hbase1.1.6解压到thirdparty目录下: $ cp ~/hbase-1.1.6-bin.tar.gz thirdparty $ cd thirdparty $ tar z

hadoop2.6完全分布式安装HBase1.1

本文出自:http://wuyudong.com/archives/119 对于全分布式的HBase安装,需要通过hbase-site.xml文档来配置本机的HBase特性,由于各个HBase之间通过zookeeper来进行通信,因此需要维护一组zookeeper系统,关于zookeeper的安装使用,参考<hadoop2.6完全分布式安装zookeeper3.4.6> 关于HBase的介绍,可以看这里<HBase简介> 1.安装Hbase(1)下载hbase版本 下载地址:htt