数据批量导入HBase

测试数据:

datas

1001    lilei   17  13800001111
1002    lily    16  13800001112
1003    lucy    16  13800001113
1004    meimei  16  13800001114

数据批量导入使用mr,先生成HFile文件然后在用completebulkload工具导入。

1、需要先在hbase 创建表名:

hbase> create ‘student‘, {NAME => ‘info‘}

maven pom.xml配置文件如下:

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>

<!-- hbase -->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.0.0</version>
        </dependency>

编写MapReduce代码如下:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @author 作者 E-mail:
 * @version 创建时间:2016年3月2日 下午4:15:57
 * 类说明
 */
public class CreateHfileByMapReduce {

    public static class MyBulkMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>{
        @Override
        protected void setup( Mapper<LongWritable, Text, ImmutableBytesWritable, KeyValue>.Context context )
            throws IOException, InterruptedException {

            super.setup( context );
        }
        @Override
        protected void map( LongWritable key, Text value,
                            Context context )
            throws IOException, InterruptedException {
            String[] split = value.toString().split("\t"); // 根据实际情况修改
            if (split.length == 4){
                byte[] rowkey = split[0].getBytes();
                ImmutableBytesWritable imrowkey = new ImmutableBytesWritable( rowkey );
                context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(split[1])));
                context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(split[2])));
                context.write(imrowkey, new KeyValue(rowkey, Bytes.toBytes("info"), Bytes.toBytes("phone"), Bytes.toBytes(split[3])));
            }
        }
    }

    @SuppressWarnings( "deprecation" )
    public static void main( String[] args ) {
        if (args.length != 4){
            System.err.println("Usage: CreateHfileByMapReduce <table_name><data_input_path><hfile_output_path> ");
            System.exit(2);
        }

        String tableName = args[0];
        String inputPath  = args[1];
        String outputPath = args[2];

      /*  String tableName = "student";
        String inputPath  = "hdfs://node2:9000/datas";
        String outputPath = "hdfs://node2:9000/user/output";*/
        HTable hTable = null;
        Configuration conf = HBaseConfiguration.create();
        try {
           hTable  = new HTable(conf, tableName);
           Job job = Job.getInstance( conf, "CreateHfileByMapReduce");
           job.setJarByClass( CreateHfileByMapReduce.class );
           job.setMapperClass(MyBulkMapper.class);
           job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class);
           //
           HFileOutputFormat.configureIncrementalLoad(job, hTable);
           FileInputFormat.addInputPath( job, new Path(inputPath) );
           FileOutputFormat.setOutputPath( job, new Path(outputPath) );
           System.exit( job.waitForCompletion(true)? 0: 1 );

        }
        catch ( Exception e ) {

            e.printStackTrace();
        }

    }
}

注: 借助maven的assembly插件, 生成胖jar包(就是把依赖的zookeeper和hbase jar包都打到该MapReduce包中), 否则的话, 就需要用户静态配置, 在Hadoop的class中添加zookeeper和hbase的配置文件和相关jar包.

最终的jar包为 bulk.jar, 主类名为cn.bd.batch.mr.CreateHfileByMapReduce, 生成HFile, 增量热载入hbase
sudo -u hdfs hadoop jar <xxoo>.jar <MainClass> <table_name> <data_input_path> <hfile_output_path>
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hfile_output_path> <table_name>

hadoop jar bulk.jar cn.bd.batch.mr.CreateHfileByMapReduce student /datas /user/output

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles /user/output student

本文参考地址:http://www.cnblogs.com/mumuxinfei/p/3823367.html

时间: 2024-11-05 12:22:48

数据批量导入HBase的相关文章

将ACCESS数据批量导入SQL SERVER

代码: IF OBJECT_ID('Sp_InputAccesstoSQL') IS NOT NULL     DROP PROC Sp_InputAccesstoSQL GO CREATE PROC Sp_InputAccesstoSQL @dir NVARCHAR(100),--ACCESS文件存放路径:如D:\Files @tabname NVARCHAR(50) --定义导入到数据库中的表名,如果存在就不需要创建 AS SET NOCOUNT ON DECLARE @cmd NVARCH

c#如何将dataset中的数据批量导入oracle数据库

不要写insert语句,因为数据库字段太多了,有什么简单点的效率高的方法吗 public void MultiInsertData(DataSet ds) { string connt = "Oracle的连接字符串"; string sql = "select id,name,- from tablename";必须与ds中的一致 DataTable dt = ds.Defaults[0]; OracleConnection conn = new OracleCo

用java实现excel数据批量导入数据库

不管是做软件还是做网站,相信很多人在做的时候都要用到数据库,而数据库的数据从何而来呢,可以使手动添加的,但是大多数情况下我们使用的是已有的数据,我们想借助开发的工具管理目前已有的数据,如果是小量的数据,手动录入也无妨,但是对于大量数据呢,手动录入显然已经不显示,而且,更多的时候,我们呢想在软件上增加一个数据的批量导入,这样不管是谁在用这款软件,都可以方便的管理已有数据,那么我们最常用的数据管理器是什么呢,毫无疑问Excel,但是我们做软件是几乎没有人会使用Excel作为DB,所以我们遇到的问题就

java把excel数据批量导入到数据库

java把excel数据批量导入到数据库中,java导入excel数据代码如下 1.    public List<Choice> GetFromXls(String xlsname){ 2. 3.        List<Choice> choices = new ArrayList<Choice>(); 4.        Choice choice=null; 5.        try { 6.            java.io.File file=new

mysql中把一个表的数据批量导入另一个表中(不同情况)

mysql中把一个表的数据批量导入另一个表中 不管是在网站开发还是在应用程序开发中,我们经常会碰到需要将MySQL某个表的数据批量导入到另一个表的情况,甚至有时还需要指定导入字段. 本文就将以MySQL数据库为例,介绍如何通过SQL命令行将某个表的所有数据或指定字段的数据,导入到目标表 中. 类别一. 如果两张张表(导出表和目标表)的字段一致,并且希望插入全部数据,可以用这种方法:(此方法只适合导出两表在同一database) INSERT INTO 目标表 SELECT * FROM 来源表;

Excel数据批量导入到数据库

1.今天做批量导入网上找了个例子,改了改,运行起来了.用POI实现Excel的读取,需要jar包. 2.ReadExcel.java读取数据 /** * */ package com.b510.excel; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.List; import

Shp数据批量导入Postgresql工具的原理和设计

1.背景 在制作整体的开源工具箱产品中,数据入库是一个重要的环节.虽然PostGIS提供了数据入库界面化操作,但是毕竟此工具无法集成至我方工具箱中,并且该工具界面.操作都不是很理想,也无法定制化完成一些具体需求.所以,自制一个shp入库工具还是比较有必要的. 2.思路 2.1  shp导入思路 Shp导入至数据库中,无论是Postgresql.mysql.oracle等,均是先将Shp转换成符合对应数据库的sql,然后再导入进去对应数据库. 针对Postgresql,在PG的安装路径下有pgsq

将mysql表数据批量导入redis zset结构中

工作中有这样一个需求,要将用户的魅力值数据做排行,生成榜单展示前40名,每隔5分钟刷新一次榜单.这样的需求用redis的zset是很方便实现的.但是数据存在mysql的表中,有400多万条,怎么将其快速的放入redis中呢? 一般我们想到是通过程序把数据从mysql中查出来,然后存入redis,但是这样不仅耗时,而且不能保证写入redis的数据的准确性,这中间存在一个部署的时差.通过google老师一查原来redis提供了批量导入数据的功能,原帖地址: http://baijian.github

Django model中数据批量导入bulk_create()

在Django中需要向数据库中插入多条数据(list).使用如下方法,每次save()的时候都会访问一次数据库.导致性能问题: for i in resultlist: p = Account(name=i) p.save() 在django1.4以后加入了新的特性.使用django.db.models.query.QuerySet.bulk_create()批量创建对象,减少SQL查询次数.改进如下: querysetlist=[] for i in resultlist: querysetl