使用BulkLoad从HDFS批量导入数据到HBase

在向Hbase中写入数据时,常见的写入方法有使用HBase API,Mapreduce批量导入数据,使用这些方式带入数据时,一条数据写入到HBase数据库中的大致流程如图。

数据发出后首先写入到雨鞋日志WAl中,写入到预写日志中之后,随后写入到内存MemStore中,最后在Flush到Hfile中。这样写数据的方式不会导致数据的丢失,并且道正数据的有序性,但是当遇到大量的数据写入时,写入的速度就难以保证。所以,介绍一种性能更高的写入方式BulkLoad。

使用BulkLoad批量写入数据主要分为两部分:
一、使用HFileOutputFormat2通过自己编写的MapReduce作业将HFile写入到HDFS目录,由于写入到HBase中的数据是按照顺序排序的,HFileOutputFormat2中的configureIncrementalLoad()可以完成所需的配置。
二、将Hfile从HDFS移动到HBase表中,大致过程如图

实例代码pom依赖:

<dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>0.99.2</version>
        </dependency>
package com.yangshou;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class BulkLoadMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //读取文件中的每一条数据,以序号作为行键
        String line = value.toString();
        //将数据进行切分
        //切分后数组中的元素分别为:序号,用户id,商品id,用户行为,商品分类,时间,地址
        String[] str = line.split(" ");
        String id = str[0];
        String user_id = str[1];
        String item_id = str[2];
        String behavior = str[3];
        String item_type = str[4];
        String time = str[5];
        String address = "156";
        //拼接rowkey和put
        ImmutableBytesWritable rowkry = new ImmutableBytesWritable(id.getBytes());
        Put put = new Put(id.getBytes());
        put.add("info".getBytes(),"user_id".getBytes(),user_id.getBytes());
        put.add("info".getBytes(),"item_id".getBytes(),item_id.getBytes());
        put.add("info".getBytes(),"behavior".getBytes(),behavior.getBytes());
        put.add("info".getBytes(),"item_type".getBytes(),item_type.getBytes());
        put.add("info".getBytes(),"time".getBytes(),time.getBytes());
        put.add("info".getBytes(),"address".getBytes(),address.getBytes());
        //将数据写出
        context.write(rowkry,put);
    }
}
package com.yangshou;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class BulkLoadDriver  {
    public static void main(String[] args) throws Exception {
        //获取Hbase配置
        Configuration conf = HBaseConfiguration.create();
        Connection conn = ConnectionFactory.createConnection(conf);
        Table table = conn.getTable(TableName.valueOf("BulkLoadDemo"));
        Admin admin = conn.getAdmin();

        //设置job
        Job job = Job.getInstance(conf,"BulkLoad");
        job.setJarByClass(BulkLoadDriver.class);
        job.setMapperClass(BulkLoadMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);

        //设置文件的输入输出路径
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(HFileOutputFormat2.class);
        FileInputFormat.setInputPaths(job,new Path("hdfs://hadoopalone:9000/tmp/000000_0"));
        FileOutputFormat.setOutputPath(job,new Path("hdfs://hadoopalone:9000/demo1"));

        //将数据加载到Hbase表中
        HFileOutputFormat2.configureIncrementalLoad(job,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo")));
        if(job.waitForCompletion(true)){
            LoadIncrementalHFiles load = new LoadIncrementalHFiles(conf);
            load.doBulkLoad(new Path("hdfs://hadoopalone:9000/demo1"),admin,table,conn.getRegionLocator(TableName.valueOf("BulkLoadDemo")));

        }

    }
}

实例数据

44979   100640791   134060896   1   5271    2014-12-09  天津市
44980   100640791   96243605    1   13729   2014-12-02  新疆

在Hbase shell 中创建表

create ‘BulkLoadDemo‘,‘info‘

打包后执行
```hadoop jar BulkLoadDemo-1.0-SNAPSHOT.jar com.yangshou.BulkLoadDriver

注意:在执行hadoop jar之前应该先将Hbase中的相关包加载过来

export HADOOP_CLASSPATH=$HBASE_HOME/lib/*

原文地址:https://blog.51cto.com/14549997/2460819

时间: 2024-12-30 02:10:46

使用BulkLoad从HDFS批量导入数据到HBase的相关文章

Dynamics 2015 数据管理 之 如何批量导入数据到 正式区(一)

单一个项目的数据导入可以在具体功能 中导入,大体位置如下. 如果项目新上线的话,要批量导入数据的话,就要到如下的功能中实现了. 进入方式: 在 设置 ---- 数据管理 下载 下载后,用EXCEL打开, 给业务人员批量输入数据. 编辑好以后,上传吧,点 导入. 下一步即可以,导入以后,可以返回看看数据导入的情况,格式有没有错误等.

django 批量导入数据

一.需求 我在数据库中建了一张表,用来保存ucloud云上的project id 和project name models.py代码如下 #coding:utf-8 from django.db import models class Project(models.Model):     name = models.CharField(u'项目名称',max_length=32,blank=True)     id = models.CharField(u'项目ID',max_length=32

利用OLEDB+SqlClient实现EXCEL批量导入数据

以下是几个自己写的类 /// <summary> /// 取得Excel对象 /// </summary> /// <param name="strConn">OLEDB连接字符串</param> /// <param name="sql">SQL语句</param> /// <returns></returns> public static DataTable GetE

asp.net线程批量导入数据时通过ajax获取执行状态

最近因为工作中遇到一个需求,需要做了一个批量导入功能,但长时间运行没个反馈状态,很容易让人看了心急,产生各种臆想!为了解决心里障碍,写了这么个功能. 通过线程执行导入,并把正在执行的状态存入session,既共享执行状态,通过ajax调用session里的执行状态,从而实现反馈导入状态的功能! 上代码: 前端页面 <!DOCTYPE html> <html lang="en"> <head>  <meta charset="UTF-8

redis pipe 批量导入数据

redis pipe 批量导入数据 速度非常快, 文本需要支持redis的协议, 使用Python生成文件 代码如下 delimiter = "\r\n" data = "*3" + delimiter + "$3" + delimiter + "set" + delimiter + "$" + str(len(row[0])) + delimiter + row[0] + delimiter + &quo

使用python向Redis批量导入数据

1.使用pipeline进行批量导入数据.包含先使用rpush插入数据,然后使用expire改动过期时间 class Redis_Handler(Handler): def connect(self): #print self.host,self.port,self.table self.conn = Connection(self.host,self.port,self.table) def execute(self, action_name): filename = "/tmp/temp.t

Excel批量导入数据之数据校验

最近,工作上接到Excel批量导入数据的需求.在这里,小编想分享的是数据校验那些事,至于如何读取Excel数据,大家可以百度下. 一般而言,Excel批量导入数据,我们都会给客户提供一个固定的模板以输入我们期望的数据.然而,客户的操作,我们是无法预料的,他们有可能增加一列或者删除一列,有可能去掉Excel某个字段的输入限制等等. Excel模板,有可能只有一个,有可能若干个.各个模板的数据列,或交叉相同,或存在个性化列. 同一个模板,也许因某列数据的不同,而需要填充不同数据列. ...... 面

数据库中批量导入数据,有两列的值需要从其他表中查出来,我现在没有思路,求解惑

我现在批量往数据库里导正式数据(sql insert),但是数据中有三列分别是岗位,办事处,大区,给的数据中只给了岗位的值,办事处的值可以通过岗位值在岗位表查到,大区的值可以通过办事处的值在办事处表里查到.现在我已经把其他数据都导进去了,只剩办事处和大区没有值,我该如何批量更新这两列的值啊 导入的数据的表: 岗位表: 办事处表: 本人sql不是很好,希望sql大神能给出来解惑一下,拜谢~ 数据库中批量导入数据,有两列的值需要从其他表中查出来,我现在没有思路,求解惑 >> mysql 这个答案描

Redis批量导入数据的方法

有时候,我们需要给redis库中插入大量的数据,如做性能测试前的准备数据.遇到这种情况时,偶尔可能也会懵逼一下,这里就给大家介绍一个批量导入数据的方法. 先准备一个redis protocol的文件(redis protocol可以参考这里:https://redis.io/topics/protocol),这里是用java程序来输出的,java代码如下: <<RedisBatchTest>> public class RedisBatchTest { public static