使用MapReduce将HDFS数据导入到HBase(二)

package com.bank.service;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * 使用MapReduce批量导入Hbase
 *     通过TableOutputFormat,该类内部传给指定的Put实例并调用table.put()方法。作业结束前会主动调用flushCommits()方法保存仍在写缓冲区的数据
 *
 * @author mengyao
 *
 */
public class CnyBatch extends Configured implements Tool {

static class CnyBatchMapper extends Mapper<LongWritable, Text, LongWritable, Text> {
        protected void map(LongWritable key, Text value, Context context)
                throws java.io.IOException, InterruptedException {
            context.write(key, value);
        }
    }

static class CnyBatchReduce extends TableReducer<LongWritable, Text, NullWritable> {
        private final static String familyName = "info";
        private final static String[] qualifiers = {"gzh", "currency", "version", "valuta", "qfTime", "flag", "machineID"};
        @Override
        protected void reduce(LongWritable key,
                java.lang.Iterable<Text> value, Context context)
                throws java.io.IOException, InterruptedException {
            final String[] values = value.toString().split("\t");
            if (values.length == 7 && values.length == qualifiers.length) {
                 final String row = values[0]+"_"+values[1]+"_"+values[2]+"_"+values[3];
                 long timestamp = System.currentTimeMillis();
                 Put put = new Put(Bytes.toBytes(row));
                 for (int i = 0; i < values.length; i++) {
                     String qualifier = qualifiers[i];
                     String val = values[i];
                     put.add(Bytes.toBytes(familyName), Bytes.toBytes(qualifier), timestamp, Bytes.toBytes(val));
                 }
                 context.write(NullWritable.get(), put);
            } else {
                 System.err.println(" ERROR: value length must equale qualifier length ");
            }
        };
    }

@Override
    public int run(String[] arg0) throws Exception {
        Job job = Job.getInstance(getConf(), CnyBatch.class.getSimpleName());
        TableMapReduceUtil.addDependencyJars(job);
        job.setJarByClass(CnyBatch.class);
        
        FileInputFormat.setInputPaths(job, arg0[0]);
        job.setMapperClass(CnyBatchMapper.class);
        job.setMapOutputKeyClass(LongWritable.class);
        job.setMapOutputValueClass(Text.class);
        
        job.setReducerClass(CnyBatchReduce.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        
        
        return job.waitForCompletion(true) ? 0 : 1;
    }

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("hbase.zookeeper.quorum", "h5:2181,h6:2181,h7:2181");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        conf.set("dfs.socket.timeout", "100000");
        String[] otherArgs = new GenericOptionsParser(args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.println(" ERROR: <dataInputDir> <tableName>");
            System.exit(2);
        }
        conf.set(TableOutputFormat.OUTPUT_TABLE, args[1]);
        int status = ToolRunner.run(conf, new CnyBatch(), args);
        System.exit(status);
    }
}

时间: 2025-01-08 02:27:34

使用MapReduce将HDFS数据导入到HBase(二)的相关文章

使用MapReduce将HDFS数据导入到HBase(一)

package com.bank.service; import java.io.IOException; import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hba

Sqoop1.4.4将MySQL数据库表中数据导入到HBase表中

问题导读:         1.--hbase-table.--hbase-row-key.--column-family及--hbase-create-table参数的作用? 2.Sqoop将关系型数据库表中数据导入HBase中,默认Rowkey是什么? 3.如果关系型数据库表中存在多关键字,该怎么办? 一.简介及部分重要参数介绍 Sqoop除了能够将数据从关系型数据库导入到HDFS和Hive中,还能够导入到HBase表中. --hbase-table:通过指定--hbase-table参数值

mysql数据导入到hbase

思路:读取到一个数据库里所有的表名,然后通过sqoop循环导入到hbase 实现过程中发现 不会写shell是个硬伤 最后只能分两步进行操作 1.sel_tabs.sh /usr/bin/mysql -hIp地址 -u用户名 -p密码 -D数据库名<<EOF use select table_name from information_schema.tables where table_schema='数据库名' and table_type='base table'; EOF 运行 bash

HBase结合MapReduce批量导入(HDFS中的数据导入到HBase)

HBase结合MapReduce批量导入 1 package hbase; 2 3 import java.text.SimpleDateFormat; 4 import java.util.Date; 5 6 import org.apache.hadoop.conf.Configuration; 7 import org.apache.hadoop.hbase.client.Put; 8 import org.apache.hadoop.hbase.mapreduce.TableOutput

mapreduce将mysql数据导入到Hdfs问题

今天在执行一个sql语句的时候遇到了一个错误 java.sql.SQLException: Error writing file '/tmp/MYkn6JQ8' (Errcode: 28).搜了一下,这个问题应该是因为错误提示中提示的临时目录的空间不足导致的. 这个错误如果想看到它的现象,就必须在sql运行时来监视这个临时文件夹的大小,就可以看到mysql在/tmp下创建了一个临时文件,这个临时文件的大小取决于sql语句以及表的大小. 我表中现有大概29万行数据,大概有30列,使用insert

ImportTsv-HBase数据导入工具

一.概述 HBase官方提供了基于Mapreduce的批量数据导入工具:Bulk load和ImportTsv.关于Bulk load大家可以看下我另一篇博文. 通常HBase用户会使用HBase API导数,但是如果一次性导入大批量数据,可能占用大量Regionserver资源,影响存储在该Regionserver上其他表的查询,本文将会从源码上解析ImportTsv数据导入工具,探究如何高效导入数据到HBase. 二.ImportTsv介绍 ImportTsv是Hbase提供的一个命令行工具

数据导入HBase最常用的三种方式及实践分析

数据导入HBase最常用的三种方式及实践分析         摘要:要使用Hadoop,需要将现有的各种类型的数据库或数据文件中的数据导入HBase.一般而言,有三种常见方式:使用HBase的API中的Put方法,使用HBase 的bulk load工具和使用定制的MapReduce Job方式.本文均有详细描述. [编者按]要使用Hadoop,数据合并至关重要,HBase应用甚广.一般而言,需要 针对不同情景模式将现有的各种类型的数据库或数据文件中的数据转入至HBase 中.常见方式为:使用H

HBase 实战(1)--HBase的数据导入方式

前言: 作为Hadoop生态系统中重要的一员, HBase作为分布式列式存储, 在线实时处理的特性, 备受瞩目, 将来能在很多应用场景, 取代传统关系型数据库的江湖地位. 本篇博文重点讲解HBase的数据导入, 描述三种方式, Client API, Bulkload, 以及Hive Over HBase. *). Client API实现借助HBase的Client API来导入, 是最简易学的方式. Configuration config = HBaseConfiguration.crea

Bulk Load-HBase数据导入最佳实践

一.概述 HBase本身提供了非常多种数据导入的方式,通常有两种经常使用方式: 1.使用HBase提供的TableOutputFormat,原理是通过一个Mapreduce作业将数据导入HBase 2.还有一种方式就是使用HBase原生Client API 这两种方式因为须要频繁的与数据所存储的RegionServer通信.一次性入库大量数据时,特别占用资源,所以都不是最有效的.了解过HBase底层原理的应该都知道,HBase在HDFS中是以HFile文件结构存储的,一个比較高效便捷的方法就是使