HBase MapReduce

1. HBase to HBase

Mapper 继承 TableMapper,输入为Rowkey和Result.

public abstract class TableMapper<KEYOUT, VALUEOUT> extends Mapper<ImmutableBytesWritable, Result, KEYOUT, VALUEOUT> {    public TableMapper() {    }}
package com.scb.jason.mapper;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * Created by Administrator on 2017/8/16.
 */
public class User2BasicMapper extends TableMapper<ImmutableBytesWritable, Put> {

    private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();

    @Override
    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //Get rowKey
        mapOutputkey.set(key.get());
        Put put = new Put(key.get());
        for(Cell cell:value.rawCells()){

            if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    put.add(cell);
                }
                if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    put.add(cell);
                }
            }

        }
        context.write(mapOutputkey,put);
    }

}

Reducer 继承 TableReducer

public abstract class TableReducer<KEYIN, VALUEIN, KEYOUT> extends Reducer<KEYIN, VALUEIN, KEYOUT, Mutation> {    public TableReducer() {    }}
package com.scb.jason.reducer;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Created by Administrator on 2017/8/16.
 */
public class User2BasicReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable>{

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for(Put put:values){
            context.write(null,put);
        }
    }
}

Driver

package com.scb.jason.driver;

import com.scb.jason.mapper.User2BasicMapper;
import com.scb.jason.reducer.User2BasicReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

/**
 * Created by Administrator on 2017/8/16.
 */
public class User2BasicDriver extends Configured implements Tool{

    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don‘t set to true for MR jobs
        // set other scan attrs

        TableMapReduceUtil.initTableMapperJob(
                "user",      // input table
                scan,             // Scan instance to control CF and attribute selection
                User2BasicMapper.class,   // mapper class
                ImmutableBytesWritable.class,             // mapper output key
                Put.class,             // mapper output value
                job);
        TableMapReduceUtil.initTableReducerJob(
                "basic",      // output table
                User2BasicReducer.class,             // reducer class
                job);
        job.setNumReduceTasks(1);
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess?1:0;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int status = ToolRunner.run(configuration,new User2BasicDriver(),args);
        System.exit(status);
    }

}


2. HBase to File

Mapper

package com.scb.jason.mapper;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;

import java.io.IOException;

/**
 * Created by Administrator on 2017/8/16.
 */
public class User2FileMapper extends TableMapper<Text, Text> {

    private Text rowKeyText = new Text();
    private Text valueText = new Text();

    @Override
    public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
        //Get rowKey
        rowKeyText.set(key.get());
        Put put = new Put(key.get());
        byte[] inforName = null;
        byte[] inforAge = null;
        for(Cell cell:value.rawCells()){

            if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    inforName = CellUtil.cloneValue(cell);
                }
                if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                    inforAge = CellUtil.cloneValue(cell);
                }
            }

        }
        valueText.set(new String(inforName)+"\t"+new String(inforAge));
        context.write(rowKeyText,valueText);
    }

}

No Reducer Reducer

Driver

package com.scb.jason.driver;

import com.scb.jason.mapper.User2BasicMapper;
import com.scb.jason.mapper.User2FileMapper;
import com.scb.jason.reducer.User2BasicReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Created by Administrator on 2017/8/16.
 */
public class User2FileDriver extends Configured implements Tool{

    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        Scan scan = new Scan();
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs
        scan.setCacheBlocks(false);  // don‘t set to true for MR jobs
        // set other scan attrs

        TableMapReduceUtil.initTableMapperJob(
                "user",      // input table
                scan,             // Scan instance to control CF and attribute selection
                User2FileMapper.class,   // mapper class
                Text.class,             // mapper output key
                Text.class,             // mapper output value
                job);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setOutputFormatClass(TextOutputFormat.class);
        FileOutputFormat.setOutputPath(job,new Path(args[0]));
        job.setNumReduceTasks(1);
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess?1:0;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int status = ToolRunner.run(configuration,new User2FileDriver(),args);
        System.exit(status);
    }

}


3. File to HBase

Driver

package com.scb.jason.driver;

import com.scb.jason.mapper.File2HbaseMapper;
import com.scb.jason.mapper.User2BasicMapper;
import com.scb.jason.reducer.File2HBaseReducer;
import com.scb.jason.reducer.User2BasicReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * Created by Administrator on 2017/8/16.
 */
public class File2BasicDriver extends Configured implements Tool{

    public int run(String[] strings) throws Exception {
        Job job = Job.getInstance(this.getConf(),this.getClass().getSimpleName());
        job.setJarByClass(this.getClass());
        job.setMapperClass(File2HbaseMapper.class);
        FileInputFormat.addInputPath(job,new Path("F:\\Workspace\\File"));
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        TableMapReduceUtil.initTableReducerJob(
                "basic",      // output table
                File2HBaseReducer.class,             // reducer class
                job);
        job.setNumReduceTasks(1);
        boolean isSuccess = job.waitForCompletion(true);
        return isSuccess?1:0;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = HBaseConfiguration.create();
        int status = ToolRunner.run(configuration,new File2BasicDriver(),args);
        System.exit(status);
    }

}

Mapper

package com.scb.jason.mapper;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * Created by Administrator on 2017/8/17.
 */
public class File2HbaseMapper extends Mapper<LongWritable,Text,ImmutableBytesWritable,Put> {

    private ImmutableBytesWritable mapOutputkey = new ImmutableBytesWritable();

    @Override
    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String lineValue = value.toString();
        StringTokenizer stringTokenizer = new StringTokenizer(lineValue);
        String rowkey = stringTokenizer.nextToken();
        String name = stringTokenizer.nextToken();
        String age = stringTokenizer.nextToken();
        Put put = new Put(Bytes.toBytes(rowkey));
        put.add(Bytes.toBytes("info"),Bytes.toBytes("name"),Bytes.toBytes(name));
        put.add(Bytes.toBytes("info"),Bytes.toBytes("age"),Bytes.toBytes(age));
        mapOutputkey.set(Bytes.toBytes(key.get()));
        context.write(mapOutputkey,put);
    }

}

Reducer

package com.scb.jason.reducer;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;

import java.io.IOException;

/**
 * Created by Administrator on 2017/8/25.
 */
public class File2HBaseReducer extends TableReducer<ImmutableBytesWritable, Put, ImmutableBytesWritable> {

    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context) throws IOException, InterruptedException {
        for(Put put:values){
            context.write(null,put);
        }
    }

}


4. HBase to RDBMS

public static class MyRdbmsReducer extends Reducer<Text, IntWritable, Text, IntWritable>  {

  private Connection c = null;

  public void setup(Context context) {
    // create DB connection...
  }

  public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    // do summarization
    // in this example the keys are Text, but this is just an example
  }

  public void cleanup(Context context) {
    // close db connection
  }

}

5. File -> HFile ->  HBase 批量导入

http://www.cnblogs.com/shitouer/archive/2013/02/20/hbase-hfile-bulk-load.html

时间: 2024-10-13 01:49:30

HBase MapReduce的相关文章

Hbase Mapreduce编程

Hbase Mapreduce编程 hadoop,hbase安装参考:http://blog.csdn.net/mapengbo521521/article/details/41777721 hbase表创建数据插入参考:http://blog.csdn.net/mapengbo521521/article/details/43917119 hbase mapreduce参考:http://wenku.baidu.com/link?url=w5WwJHqI2KWOx_xQcIrP0Q2GYo0s

HBase - MapReduce - 使用 MapReduce 批量操作 HBase 介绍 | 那伊抹微笑

博文作者:那伊抹微笑 csdn 博客地址:http://blog.csdn.net/u012185296 itdog8 地址链接 : http://www.itdog8.com/thread-202-1-1.html 博文标题:HBase - MapReduce - 使用 MapReduce 批量操作 HBase 介绍 | 那伊抹微笑 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+

HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑

博文作者:那伊抹微笑 csdn 博客地址:http://blog.csdn.net/u012185296 itdog8 地址链接 : http://www.itdog8.com/thread-203-1-1.html 博文标题:HBase - MapReduce - HBase 作为输入源的示例 | 那伊抹微笑 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+

HBase - MapReduce - HBase 作为输出源的示例 | 那伊抹微笑

博文作者:那伊抹微笑 csdn 博客地址:http://blog.csdn.net/u012185296 itdog8 地址链接 : http://www.itdog8.com/thread-204-1-1.html 博文标题:HBase - MapReduce - HBase 作为输出源的示例 | 那伊抹微笑 个性签名:世界上最遥远的距离不是天涯,也不是海角,而是我站在妳的面前,妳却感觉不到我的存在 技术方向:Flume+Kafka+Storm+Redis/Hbase+Hadoop+Hive+

HBase MapReduce 使用

项目中需要用MapReduce来读取或者写Hbase,这样可以节省大量开发时间. Hbase本身的jar包里就有这样的API , 以下是我从官网上找的一些资料,和大家分享一下. 原文地址:http://hbase.apache.org/book/mapreduce.example.html 总体说明一下:TableMapper 主要是读hbase数据,TableReducer 主要是写hbase数据.可以结合一起用,也可以分开用. (一) 读Hbase实例 public static class

Hbase + Mapreduce + eclipse实例

前面blog中提到了 eclipse操作单机版的Hbase列子  不熟悉的朋友可以去看看 eclipse 连接并操作单机版Hbase 本篇文章介绍一个 Mapreduce   读取   Hbase  中数据    并进行计算 列子    类似与    wordcount   不过  此时的输入  是从 Hbase中读取 首先  需要创建输入源 启动hbase,打开Hbase shell   这里  我的配置文件中  不再是单机了  而是采用了hdfs作为 文件系统 <span style="

MapReduce/Hbase进阶提升(原理剖析、实战演练)

什么是MapReduce? MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算.概念"Map(映射)"和"Reduce(归约)",和他们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性.他极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上. 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一

HBase结合MapReduce批量导入(HDFS中的数据导入到HBase)

HBase结合MapReduce批量导入 1 package hbase; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.Put; 8 import org.apache.hadoop.hbase.mapreduce.TableOutput

MapReduce on HBase使用与集成

为什么需要MapReduce on HBase? hbase本身并没有提供很好地二级索引方式.如果直接使用hbase提供的scan直接扫描方式,在数据量很大的情况下就会非常慢. 可以使用Mapreduce的方法操作hbase数据库.Hadoop MapReduce提供相关API,可以与hbase数据库无缝连接. API链接: http://hbase.apache.org/devapidocs/index.html HBase与Hadoop的API对比 相关类 TableMapper packa