MapReduce on HBase使用与集成

为什么需要MapReduce on HBase?

hbase本身并没有提供很好地二级索引方式。如果直接使用hbase提供的scan直接扫描方式,在数据量很大的情况下就会非常慢。

可以使用Mapreduce的方法操作hbase数据库。Hadoop MapReduce提供相关API,可以与hbase数据库无缝连接。

API链接: http://hbase.apache.org/devapidocs/index.html

HBase与Hadoop的API对比

相关类

TableMapper

package org.apache.hadoop.hbase.mapreduce;

/**
 * Extends the base <code>Mapper</code> class to add the required input key
 * and value classes.
 *
 * @param <KEYOUT>  The type of the key.
 * @param <VALUEOUT>  The type of the value.
 * @see org.apache.hadoop.mapreduce.Mapper
 */
public abstract class TableMapper<KEYOUT, VALUEOUT>
extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {
}

TableReducer

package org.apache.hadoop.hbase.mapreduce;
/**
 * Extends the basic <code>Reducer</code> class to add the required key and
 * value input/output classes. While the input key and value as well as the
 * output key can be anything handed in from the previous map phase the output
 * value <u>must</u> be either a {@link org.apache.hadoop.hbase.client.Put Put}
 * or a {@link org.apache.hadoop.hbase.client.Delete Delete} instance when
 * using the {@link TableOutputFormat} class.
 * <p>
 * This class is extended by {@link IdentityTableReducer} but can also be
 * subclassed to implement similar features or any custom code needed. It has
 * the advantage to enforce the output value to a specific basic type.
 * @param <KEYIN>  The type of the input key.
 * @param <VALUEIN>  The type of the input value.
 * @param <KEYOUT>  The type of the output key.
 * @see org.apache.hadoop.mapreduce.Reducer
 */
public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT>
extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {
}

TableMapReduceUtil

public class TableMapReduceUtil {
  /**
   * Use this before submitting a TableMap job. It will appropriately set up
   * the job.
   *
   * @param table  The table name to read from.
   * @param scan  The scan instance with the columns, time range etc.
   * @param mapper  The mapper class to use.
   * @param outputKeyClass  The class of the output key.
   * @param outputValueClass  The class of the output value.
   * @param job  The current job to adjust.  Make sure the passed job is
   * carrying all necessary HBase configuration.
   * @throws IOException When setting up the details fails.
   */
  public static void initTableMapperJob(String table, Scan scan,
      Class<? extends TableMapper> mapper,
      Class<?> outputKeyClass,
      Class<?> outputValueClass, Job job)
  throws IOException {
    initTableMapperJob(table, scan, mapper, outputKeyClass, outputValueClass,
        job, true);
  }

  /**
   * Use this before submitting a TableReduce job. It will
   * appropriately set up the JobConf.
   *
   * @param table  The output table.
   * @param reducer  The reducer class to use.
   * @param job  The current job to adjust.
   * @throws IOException When determining the region count fails.
   */
  public static void initTableReducerJob(String table,
    Class<? extends TableReducer> reducer, Job job)
  throws IOException {
    initTableReducerJob(table, reducer, job, null);
  }

}

Demo

package MapReduceHbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class HbaseMR {

    private String rootDir;
    private String zkServer;
    private String port;
    private Configuration conf;
    private HConnection hConn = null;

    private HbaseMR(String rootDir,String zkServer,String port) throws IOException{
        this.rootDir = rootDir;
        this.zkServer = zkServer;
        this.port = port;

        conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", rootDir);
        conf.set("hbase.zookeeper.quorum", zkServer);
        conf.set("hbase.zookeeper.property.clientPort", port);
        hConn = HConnectionManager.createConnection(conf);
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        String rootDir = "hdfs://hadoop1:8020/hbase";
        String zkServer = "hadoop1";
        String port = "2181";

        HbaseMR conn = new HbaseMR(rootDir,zkServer,port);

        //Configuration conf = HBaseConfiguration.create();

        //Configuration conf = HBaseConfiguration.create();
        Job job = new Job(conn.conf,"MapReduce on HBase");
        job.setJarByClass(HbaseMR.class);

        Scan scan = new Scan();
        scan.setCaching(1000);//事先读取多少条记录

        TableMapReduceUtil.initTableMapperJob("students",scan,
                MyMapper.class,Text.class,Text.class,job);

        TableMapReduceUtil.initTableReducerJob("students_age", MyReducer.class, job);

        job.waitForCompletion(true);
    }
}

class MyMapper extends TableMapper<Text, Text>{

    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {
        Text k = new Text(Bytes.toString(key.get()));
        Text v = new Text(value.getValue(Bytes.toBytes("basicInfo"),
                Bytes.toBytes("age")));
        //年龄  人名
        context.write(v, k);
    }
}

class MyReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{

    @Override
    protected void reduce(Text k2, Iterable<Text> v2s,
            Context context)
            throws IOException, InterruptedException {
        Put put = new Put(Bytes.toBytes(k2.toString()));
        for (Text v2 : v2s) {//遍历获得所有的人名
            //列族 列  值
            put.add(Bytes.toBytes("f1"), Bytes.toBytes(v2.toString()),
                    Bytes.toBytes(v2.toString()));
        }
        context.write(null, put);
    }
}

运行之前先创建一个students_age表,列族为f1。

运行结果:

版权声明:本文为博主原创文章,未经博主允许不得转载。

时间: 2024-12-16 04:46:35

MapReduce on HBase使用与集成的相关文章

深入浅出Hadoop实战开发(HDFS实战图片、MapReduce、HBase实战微博、Hive应用)

Hadoop是什么,为什么要学习Hadoop?     Hadoop是一个分布式系统基础架构,由Apache基金会开发.用户可以在不了解分布式底层细节的情况下,开发分布式程序.充分利用集群的威力高速运算和存储.Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS.HDFS有着高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上.而且它提供高传输率(high throughput)来访问应用程序的数据,适合那些有着超大数据

Hbase框架原理及相关的知识点理解、Hbase访问MapReduce、Hbase访问Java API、Hbase shell及Hbase性能优化总结

转自:http://blog.csdn.net/zhongwen7710/article/details/39577431 本blog的内容包含: 第一部分:Hbase框架原理理解 第二部分:Hbase调用MapReduce函数使用理解 第三部分:Hbase调用Java API使用理解 第四部分:Hbase Shell操作 第五部分:Hbase建表.读写操作方式性能优化总结 第一部分:Hbase框架原理理解 概述 HBase是一个构建在HDFS上的分布式列存储系统:HBase是基于Google

HBase - MapReduce - 使用 MapReduce 批量操作 HBase 介绍 | 那伊抹微笑

博文作者:那伊抹微笑 csdn 博客地址:http://blog.csdn.net/u012185296 itdog8 地址链接 : http://www.itdog8.com/thread-202-1-1.html 博文标题:HBase - MapReduce - 使用 MapReduce 批量操作 HBase 介绍 | 那伊抹微笑 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+

HDFS,MapReduce,Hive,Hbase 等之间的关系

HDFS: HDFS是GFS的一种实现,他的完整名字是分布式文件系统,类似于FAT32,NTFS,是一种文件格式,是底层的. Hive与Hbase的数据一般都存储在HDFS上.Hadoop HDFS为他们提供了高可靠性的底层存储支持. Hive: Hive不支持更改数据的操作,Hive基于数据仓库,提供静态数据的动态查询.其使用类SQL语言,底层经过编译转为MapReduce程序,在Hadoop上运行,数据存储在HDFS上. Hbase: Hbase是Hadoop database,即Hadoo

升级版:深入浅出Hadoop实战开发(云存储、MapReduce、HBase实战微博、Hive应用、Storm应用)

      Hadoop是一个分布式系统基础架构,由Apache基金会开发.用户可以在不了解分布式底层细节的情况下,开发分布式程序.充分利用集群的威力高速运算和存储.Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS.HDFS有着高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上.而且它提供高传输率(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序

使用MapReduce查询Hbase表指定列簇的全部数据输入到HDFS(一)

package com.bank.service; import java.io.IOException; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hba

MapReduce操作Hbase --table2file

官方手册:http://hbase.apache.org/book.html#mapreduce.example 简单的操作,将hbase表中的数据写入到文件中. RunJob 源码: 1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.fs.FileSystem; 3 import org.apache.hadoop.fs.Path; 4 import org.apache.hadoop.hbase

MapReduce批量操作HBase

欢迎访问:鲁春利的工作笔记,学习是一种信仰,让时间考验坚持的力量. 环境: hadoop-2.6.0 hbase-1.0.1 zookeeper-3.4.6 1.Hadoop集群配置过程略: 2.Zookeeper集群配置过程略: 3.HBase集群配置过程略: 4.HBase作为输入源示例 查看当前hbase表m_domain中的数据 [[email protected] conf]$ hbase shell HBase Shell; enter 'help<RETURN>' for lis

在cdh5.1.3中在mapreduce使用hbase

环境:centos6.5 .cdh5.1.3 一.hadoop命令找不到hbase相关类 (一)观察hadoop classpath的输出: 1,classpath包含了/etc/hadoop/conf,这是hadoop当前使用的配置文件的目录. 2,classpath以*结尾, (二),找到hbase相关jar包位置 (三)修改hadoop-env.sh 打开/etc/hadoop/conf/hadoop-env.sh,为HADOOP_CLASSPATH添加上一步找到的jar文件路径. 1,指