使用MapReduce查询Hbase表指定列簇的全部数据输入到HDFS(一)

package com.bank.service;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 查询hbase表指定列簇的全部数据输出到HDFS上
 * @author mengyao
 *
 */
public class ReadHbase extends Configured implements Tool {

private static String tableName;
    private static String outputDir;

static class ReadHbaseMapper extends TableMapper<Text, Text> {
        private static Text k = new Text();
        private static Text v = new Text();
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            StringBuffer sb = new StringBuffer("");
            for(java.util.Map.Entry<byte[], byte[]> val : value.getFamilyMap(Bytes.toBytes("info")).entrySet()){
                String str = new String(val.getValue());
                if (str != null) {
                    sb.append(new String(val.getKey()));
                    sb.append(":");
                    sb.append(str);
                    sb.append(",");
                }
            }
            String line = sb.toString();
            k.set(key.get());
            v.set(new String(line.substring(0,line.length()-1)));
            context.write(k, v);
        }
    }

static class ReadHbaseReduce extends Reducer<Text, Text, Text, Text> {
        private Text result = new Text();
        @Override
        protected void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
            for (Text val : value) {
                result.set(val);
                context.write(key, result);
            }
        }
    }
    
    @Override
    public int run(String[] arg0) throws Exception {
        tableName = arg0[0];
        outputDir = arg0[1];
        Job job = Job.getInstance(getConf(), ReadHbase.class.getSimpleName());
        job.setJarByClass(ReadHbase.class);
        job.setReducerClass(ReadHbaseReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        FileOutputFormat.setOutputPath(job, new Path(outputDir));
        TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), ReadHbaseMapper.class, Text.class, Text.class, job);
        TableMapReduceUtil.addDependencyJars(job);
        return job.waitForCompletion(true) ? 0 : 1;
    }
    
    public static void main(String[] args) throws Exception {
        String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println(" Usage:" + ReadHbase.class.getSimpleName() + " <tableName> <outputDir> ");
            System.exit(2);
        }
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "h5:2181,h6:2181,h7:2181");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("dfs.socket.timeout", "3600000");
        int status = ToolRunner.run(conf, new ReadHbase(), otherArgs);
        System.exit(status);
    }

}

时间: 2024-12-23 07:43:36

使用MapReduce查询Hbase表指定列簇的全部数据输入到HDFS(一)的相关文章

并发拉取HBase大量指定列数据时卡住的问题排查

最近遇到一例,并发拉取HBase大量指定列数据时,导致应用不响应的情形.记录一下. 背景 退款导出中,为了获取商品规格编码,需要从HBase表 T 里拉取对应的数据. T 对商品数据的存储采用了 表名:字段名:id 的列存储方式.由于这个表很大,且为详情公用,因此不方便使用 scan 的方式,担心带来集群的不稳定,进而影响详情和导出的整体稳定性. 要用 multiGet 的方式来获取多个订单的这个列的数据. 就必须动态生成相应的列,然后在 HBase 获取数据的时候指定列集合. 现有记录集合 L

SQL查询某表是否存在及返回新增数据的ID

下面简单介绍了SQL查询某表是否存在以及返回新增数据的ID值. 1.查询表是否存在: 表名:"t_Demo", type = 'u'  查看是不是用户表 select * from sysobjects where id = object_id('t_Demo') and type = 'u' select * from sys.tables where name='t_Demo' and type = 'u' 2.查询字段是否存在: 表名:"t_Demo", 字段

Mapreduce读取Hbase表,写数据到多个Hbase表中

Job端的变化: 通过设置conf,配置输出表,在reduce中获取输出表名字 Configuration conf = job.getConfiguration(); //输出表1 conf.set("usertag_output", "usertag"); //输出表2 conf.set("prodtag_output", "prodtag"); job.setReducerClass(LabelReducer.class

hbase 表的设计与其它大数据框架的集成

一:hbase 表的设计管理 二:hbase hive 集成 三:sqoop 与hbase 的集成 四:hbase 与hue 集成 五:hbase 表的修复 一:hbase 表的设计管理 1.1 hbase 的shell 命令 1.1.1 创建一个命名空间 在新版本的hbase 中 表是存储在命名空间当中,默认的命名空间是default 创建一个命名空间: create_namespace 'ns2' 查看有多少个命名空间: list_namespace 在命名空间中建立表: create 'n

Mapreduce读取Hbase表,写数据到一个Hbase表中

public class LabelJob { public static void main(String[] args) throws Exception { Job job = Job.getInstance(new Configuration()); job.setJarByClass(LabelJob.class); job.setJobName("Hbase.LabelJob"); Configuration conf = job.getConfiguration(); c

SQL Server 查询数据库表的列数

1 select count(*) from sysobjects a 2 join syscolumns b 3 on a.id=b.id 4 where a.name='表名' 5 go

SQL SERVER 查询一个表有多少列

select count(1) from syscolumns where id = object_id('tbname') 或者 select * from syscolumns where id = object_id('tbname') 或 SELECT MAX(colid) FROM syscolumns WHERE id=OBJECT_ID('table')

SQLserver 查询一个表有多少列

① select count(1) from syscolumns where id=object_id('tablename') ② select count(*) from syscolumns where id=object_id('tablename') ③ select max(colid) from syscolumns where id=object_id('tablename')

BulkLoad加载本地文件到HBase表

BulkLoad加载文件到HBase表 1.功能 将本地数据导入到HBase中 2.原理 BulkLoad会将tsv/csv格式的文件编程hfile文件,然后再进行数据的导入,这样可以避免大量数据导入时造成的集群写入压力过大. 1.tsv格式的文件:字段之间以制表符\t分割 2.csv格式的文件:字段之间以逗号,分割 3.作用 减小HBase集群插入数据的压力 提高了Job运行的速度,降低了Job执行时间 4.案例 Step1.配置临时环境变量 $ export HBASE_HOME=/opt/