TableMapReduceUtil使用

  今天在从文件中读取数据在写入到hbase的时候,使用到了TableMapReduceUtil工具类,使用过程只需要简单的设置之后工具类会帮我们生成写入到HBase的任务,工作类封装了许多MapReduce写入到HBase的操作,无需我们自己再去设置,下面大致看看内部的实现机制,对TableMapReduceUtil有个比较深入的了解

  使用过程:在map端生成了<ImmutableBytesWritable , put>的输出类型,key和value分别为key和put对象,然后使用如下设置

1 TableMapReduceUtil.initTableReducerJob("Test_MR_HBase", // output table
2                               null,       // reducer class
3                               job);       // TableMapReduceUtil是HBase提供的工具类,会自动设置mapreuce提交到hbase任务的各种配置,封装了操作
4 job.setNumReduceTasks(0);// 设置reduce过程,这里由map端的数据直接提交,不要使用reduce类,因而设置成null,并设置reduce的个数为0
5 FileInputFormat.addInputPath(job, new Path(args[0]));// 设置输入文件路径

  追踪方法的具体细节,查看具体的实现过程,发现最核心的任务设置的方法

 1 public static void initTableReducerJob(String table,
 2     Class<? extends TableReducer> reducer, Job job,
 3     Class partitioner, String quorumAddress, String serverClass,
 4     String serverImpl, boolean addDependencyJars) throws IOException {
 5
 6     Configuration conf = job.getConfiguration();
 7     HBaseConfiguration.merge(conf, HBaseConfiguration.create(conf));
 8     job.setOutputFormatClass(TableOutputFormat.class);//可见输出使用的TableOutputFormat类,结果会通过网络请求到HBase集群
 9     if (reducer != null) job.setReducerClass(reducer);//设置reudce
10     conf.set(TableOutputFormat.OUTPUT_TABLE, table);//设置输出hbase的表
11     conf.setStrings("io.serializations", conf.get("io.serializations"),
12         MutationSerialization.class.getName(), ResultSerialization.class.getName());
13     // If passed a quorum/ensemble address, pass it on to TableOutputFormat.
14     if (quorumAddress != null) {
15       // Calling this will validate the format
16       ZKUtil.transformClusterKey(quorumAddress);
17       conf.set(TableOutputFormat.QUORUM_ADDRESS,quorumAddress);
18     }
19     if (serverClass != null && serverImpl != null) {
20       conf.set(TableOutputFormat.REGION_SERVER_CLASS, serverClass);
21       conf.set(TableOutputFormat.REGION_SERVER_IMPL, serverImpl);
22     }
23     job.setOutputKeyClass(ImmutableBytesWritable.class);//设置输出key的类型
24     job.setOutputValueClass(Writable.class);//输出的value类型
25     if (partitioner == HRegionPartitioner.class) {
26       job.setPartitionerClass(HRegionPartitioner.class);//设置partitioner算法,partition算法会决定map输出的结果输出到哪一个reduce上
27       int regions = MetaReader.getRegionCount(conf, table);//获得region的数量,并根据region的数量设置reduce的个数
28       if (job.getNumReduceTasks() > regions) {//将reudce的数目设置成region的个数
29         job.setNumReduceTasks(regions);
30       }
31     } else if (partitioner != null) {
32       job.setPartitionerClass(partitioner);
33     }
34
35     if (addDependencyJars) {
36       addDependencyJars(job);
37     }
38
39     initCredentials(job);
40   }

  值得注意的一点是工具类最终的实现是通过HBase的put方法通过网络请求提交数据,在大批量写入时需要考虑对hbase带来的负载

  driver程序如下:

 1 private int excuteHFile(String[] args) throws Exception {
 2         Configuration conf = HBaseConfiguration.create();// 任务的配置设置,configuration是一个任务的配置对象,封装了任务的配置信息
 3
 4         Job job = Job.getInstance(conf, "HFile bulk load test");// 生成一个新的任务对象并设置dirver类
 5
 6         job.setJarByClass(HfileToHBaseDriver.class);
 7         job.setMapperClass(HfileToHBaseMapper.class); // 设置任务的map类和 ,map类输出结果是ImmutableBytesWritable和put类型
 8
 9         TableMapReduceUtil.initTableReducerJob("Test_MR_HBase", // output table
10
11                 null, // reducer class
12
13                 job);// TableMapReduceUtil是HBase提供的工具类,会自动设置mapreuce提交到hbase任务的各种配置,封装了操作,只需要简单的设置即可
14
15         job.setNumReduceTasks(0);// 设置reduce过程,这里由map端的数据直接提交,不要使用reduce类,因而设置成null,并设置reduce的个数为0
16
17         FileInputFormat.addInputPath(job, new Path(args[0]));// 设置输入文件路径
18
19         return (job.waitForCompletion(true) ? 0 : -1);
20     }

时间: 2024-10-22 00:39:12

TableMapReduceUtil使用的相关文章

HBase操作注意事项

1.HBase如果加了列限定,如果该列不存在时返回的结果为empty. 看下面的代码:   Get get = new Get(Bytes.toBytes("100"));     get.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name")); 这里加入了列限定,也就是只返回列族info下面的name字段.但是如果name字段根本不存在,返回的Result在调用 result.isEmpt

基于Solr的HBase多条件查询测试

转自:http://www.cnblogs.com/chenz/articles/3229997.html 背景: 某电信项目中采用HBase来存储用户终端明细数据,供前台页面即时查询.HBase无可置疑拥有其优势,但其本身只对rowkey支持毫秒级的快速检索,对于多字段的组合查询却无能为力.针对HBase的多条件查询也有多种方案,但是这些方案要么太复杂,要么效率太低,本文只对基于Solr的HBase多条件查询方案进行测试和验证. 原理: 基于Solr的HBase多条件查询原理很简单,将HBas

ImportTsv-HBase数据导入工具

一.概述 HBase官方提供了基于Mapreduce的批量数据导入工具:Bulk load和ImportTsv.关于Bulk load大家可以看下我另一篇博文. 通常HBase用户会使用HBase API导数,但是如果一次性导入大批量数据,可能占用大量Regionserver资源,影响存储在该Regionserver上其他表的查询,本文将会从源码上解析ImportTsv数据导入工具,探究如何高效导入数据到HBase. 二.ImportTsv介绍 ImportTsv是Hbase提供的一个命令行工具

hbase 批量插入api

1.数据格式a.txt: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.10

HBase建立二级索引的一些解决方案

HBase的一级索引就是rowkey,我们只能通过rowkey进行检索.如果我们相对hbase里面列族的列列进行一些组合查询,就需要采用HBase的二级索引方案来进行多条件的查询. 常见的二级索引方案有以下几种: 1.MapReduce方案 2.ITHBASE方案 3.IHBASE方案 4.Coprocessor方案 5.Solr+hbase方案 MapReduce方案 IndexBuilder:利用MR的方式构建Index 优点:并发批量构建Index 缺点:不能实时构建Index 举例: 原

MapReduce on HBase使用与集成

为什么需要MapReduce on HBase? hbase本身并没有提供很好地二级索引方式.如果直接使用hbase提供的scan直接扫描方式,在数据量很大的情况下就会非常慢. 可以使用Mapreduce的方法操作hbase数据库.Hadoop MapReduce提供相关API,可以与hbase数据库无缝连接. API链接: http://hbase.apache.org/devapidocs/index.html HBase与Hadoop的API对比 相关类 TableMapper packa

使用MapReduce查询Hbase表指定列簇的全部数据输入到HDFS(一)

package com.bank.service; import java.io.IOException; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hba

使用MapReduce将HDFS数据导入到HBase(二)

package com.bank.service; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;import org.apache.hadoop.hbase.mapred

HBase概念学习(七)HBase与Mapreduce集成

这篇文章是看了HBase权威指南之后,根据上面的讲解搬下来的例子,但是稍微有些不一样. HBase与mapreduce的集成无非就是mapreduce作业以HBase表作为输入,或者作为输出,也或者作为mapreduce作业之间共享数据的介质. 这篇文章将讲解两个例子: 1.读取存储在hdfs上的txt文本数据,简单地以json字符串的形式存储到HBase表中. 2.将第一步存储的HBase表中的json字符串读取出来,解析存储到新的HBase表中,可以进行查询. 本文详细给出了源码以及如何运行