hbase 数据库深入使用与相关数据的加载

  • 一:hbase 数据检索流程
  • 二:hbase 数据库java api 调用
  • 三:hbase 各个服务的作用
  • 四:hbase 与mapreduce集成
  • 五:hbase 使用BulkLoad 加载数据

一:hbase 数据检索流程

1.1 hbase 数据检索流程图:

1.2 hbase 读的流程:

读流程:
1、client请求zookeeper集群(root/meta)(meta)
        --有多少table,table有哪些region(startrow、stoprow)
2、client找到region对应的region server
3、region server响应客户端请求    

1.3. hhbase 写的流程

1、client请求zookeeper集群,该数据应该写入哪个region
2、向region所在的region server 发起写请求
3、数据先写进HLOG(WAL)
4、然后写入memstore(flush)
5、当memstore达到阀值,写入storefile(compact)
6、当storefile达到阀值,合并成新的storefile
7、当region达到阀值,当前region会划分为两个新的region(split)

1.4 hbase 读写流程存储核心的三个机制

1. flush机制:当memstore满了以后会flush陈一个storefile
2. compact机制:当storefile达到阀值,合并storefile,合并过程中cell版本合并和数据删除
3. split机制:当region不断增大,达到阀值,region会分成两个新的region

二:hbase 数据库java api 调用

2.1 eclipse 环境配置

 更改maven 的源:
 上传repository.tar.gz 
cd .m2
mv repository repository.bak2016612
rz repository.tar.gz
tar -zxvf repository.tar.gz 

cd /home/hadoop/yangyang/hbase
cp -p hbase-site.xml log4j.properties /home/hadoop/workspace/studyhbase/src/main/rescourse

更改eclipse 的pom.xml

增加:

<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-server</artifactId>
    <version>0.98.6-hadoop2</version>
</dependency>
<dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>0.98.6-hadoop2</version>
</dependency>

2.2 hbase java api 掉用:

package org.apache.hadoop.hbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class HbaseOperation {

    /**
     *
     * @param args
     * @throws IOException
     */

    public static HTable getTable(String tableName) throws IOException {
        // Get configuration
        Configuration conf = HBaseConfiguration.create();

        // Get Table
        HTable table = new HTable(conf, tableName);

        return table;
    }

    public static void getData() throws IOException {
        HTable table = HbaseOperation.getTable("user");
        // Get Data
        Get get = new Get(Bytes.toBytes("1001"));
        Result result = table.get(get);
        Cell[] cells = result.rawCells();

        for (Cell cell : cells) {
            System.out.print(Bytes.toString(CellUtil.cloneFamily(cell)) + ":");
            System.out.print(Bytes.toString(CellUtil.cloneQualifier(cell))
                    + "==>");
            System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
        }
        table.close();
    }

    /**
     *
     * @param args
     * @throws IOException
     */

    public static void putData() throws IOException {
        HTable table = HbaseOperation.getTable("user");

        Put put = new Put(Bytes.toBytes("1004"));

        put.add(Bytes.toBytes("info"), Bytes.toBytes("name"),
                Bytes.toBytes("zhaoliu"));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("age"),
                Bytes.toBytes("50"));
        put.add(Bytes.toBytes("info"), Bytes.toBytes("sex"),
                Bytes.toBytes("male"));

        table.put(put);
        table.close();
    }

    public static void main(String[] args) throws IOException {
        HTable table = HbaseOperation.getTable("user");

        Scan scan = new Scan();
        scan.setStartRow(Bytes.toBytes("1001")) ;
        scan.setStopRow(Bytes.toBytes("1002")) ;
        ResultScanner resultScanner = table.getScanner(scan);

        for (Result res : resultScanner) {
            Cell[] ress = res.rawCells();
            for (Cell cell : ress) {
                System.out.print(Bytes.toString(CellUtil.cloneRow(cell))
                        + "\t");
                System.out.print(Bytes.toString(CellUtil.cloneFamily(cell))
                        + ":");
                System.out.print(Bytes.toString(CellUtil.cloneQualifier(cell))
                        + "==>");
                System.out.println(Bytes.toString(CellUtil.cloneValue(cell)));
            }
            table.close();

        }

    }
}

三:hbase 各个服务的作用

3.1 Hmaster 作用:

    1、为region server分配region
    2、负责region server的负责均衡
    3、发现失效的region server,需要重新分配其上的region
    4、监听zk,基于zookeeper感应region server的上下线
    5、监听zk,基于zookeeper来保证HA
    6、不参与客户端数据读写访问
    7、负载低(通常情况下可以把它和其他服务器(NN/SNN)整合在一起)
    8、无单点故障(SPOF)

3.2 Hregionserver 作用:

    1、维护master分配给它的region
    2、响应客户端的IO访问请求(读写)
    3、处理region的flush、compact、split
    4、维护region的cache

3.4 zookeeper 作用:

    1、保证集群里面只有一个master(HA)
    2、保存了root region的位置(meta),访问入口地址
    3、实时监控region server的状态,及时通知region server上下线消息给master
    4、存储了hbase的schema,包括哪些table,每个表有哪些列簇

四:hbase 与mapreduce集成

4.1 hbase 获取jar命令

bin/hbase mapredcp 

4.2 配置环境变量

vim .bash_profile
export HADOOP_HOME=/home/hadoop/yangyang/hadoop
export HBASE_HOME=/home/hadoop/yangyang/hbase
export HADOOP_CLASSPATH=`$HBASE_HOME/bin/hbase mapredcp`
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin:${HADOOP_HOME}/bin:${MAVEN_HOME}/bin:${HBASE_HOME}:${HADOOP_CLASSPATH}
soure .bash_profile

4.3 统计一个hbase表:

cd /home/hadoop/yangyang/hadoop

bin/yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar rowcounter user


4.4 导入一个生成的hbase 表的in.tsv

vim in.tsv
---
10010   zhangsan        30      shanghai
10011   lisi    31      beijin
10012   wangwu  32      shanghai
10013   zaoliu  30      beijin
hdfs dfs -put in.tsv /input

yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:address user /input/in.tsv



五: 使用BulkLoad加载数据

vim out.tsv
110     zhangsan        30      shanghai
111     lisi    31      beijin
112     wangwu  32      shanghai
113     zaoliu  30      beijin
hdfs dfs -put out.tsv /input
<!-- 将tsv 文件转换成hfile 文件(在hdfs 上面)-->
yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar importtsv -Dimporttsv.bulk.output=/hfileoutput/ -Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:age,info:tel user /input/out.tsv

<!-- 将hfile 加载到hbase 的表中
yarn jar /home/hadoop/yangyang/hbase/lib/hbase-server-0.98.6-cdh5.3.6.jar completebulkload /hfileoutput user





5.1 hbase 表中提取相关字段,生成新的表

提取hbase 表中的user 表的name 与age 字段 生成新表student

package org.apache.hadoop.studyhbase;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class User2StudentMapReduce extends Configured implements Tool{

    // step 1: Mapper
    public static class ReadUserMapper //
            extends TableMapper<ImmutableBytesWritable, Put>{

        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Context context)
                        throws IOException, InterruptedException {
            // user: name & age  ->  student: name & age : put
            // create Put
            Put put = new Put(key.get()) ;
            // add column
            for(Cell cell: value.rawCells()){
                // add family: info
                if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
                    // add column: name
                    if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell) ;
                    //  CellUtil.cloneValue(cell)
                    //  put.add(family, qualifier, value) ;
                    }
                    // add column: age
                    else if("age".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
                        put.add(cell) ;
                    }
                }
            }
            // context output
            context.write(key, put);
        }

    }

    // step 2: Reducer
    public static class WriteStudentReducer //
            extends TableReducer<ImmutableBytesWritable, Put, NullWritable>{

        @Override
        protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
                Context context) throws IOException, InterruptedException {
            for(Put put : values){
                context.write(NullWritable.get(), put);
            }
        }
    }

    // step 3: Driver
    public int run(String[] args) throws Exception {
        // 1) Configuration
        Configuration conf = this.getConf();

        // 2) create job
        Job job = Job.getInstance(conf, this.getClass().getSimpleName()) ;
        job.setJarByClass(User2StudentMapReduce.class);

        // 3) set job
        // input -> mapper -> shuffle -> reducer -> output
        Scan scan = new Scan() ;
        scan.setCacheBlocks(false);
        scan.setCaching(500);        // 1 is the default in Scan, which will be bad for MapReduce jobs

        TableMapReduceUtil.initTableMapperJob(
                  "user",        // input table
                  scan,               // Scan instance to control CF and attribute selection
                  ReadUserMapper.class,     // mapper class
                  ImmutableBytesWritable.class,         // mapper output key
                  Put.class,  // mapper output value
                  job //
          );

        TableMapReduceUtil.initTableReducerJob(
                  "student",        // output table
                  WriteStudentReducer.class,    // reducer class
                  job //
            );

        job.setNumReduceTasks(1);   // at least one, adjust as required

        boolean isSuccess = job.waitForCompletion(true);
        if (!isSuccess) {
          throw new IOException("error with job!");
        }
        return isSuccess ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        int status = ToolRunner.run(//
                conf, //
                new User2StudentMapReduce(), //
                args //
            );
        System.exit(status);
    }

}
去hbase 上面新建空表student
create ‘student‘,‘info‘

导出新生成jar包User2Student.jar运行:

cd /home/hadoop/jars
yarn jar User2Student.jar


查询hbase进行验证:

原文地址:http://blog.51cto.com/flyfish225/2097444

时间: 2024-10-06 00:22:06

hbase 数据库深入使用与相关数据的加载的相关文章

蜡炬教育:如何处理机器学习中大型数据的加载问题?

原标题:蜡炬教育:如何处理机器学习中大型数据的加载问题? 蜡炬教育老师表示,在处理机器学习算法时,经常会因为数据库太大而导致无法放入内存中,而遇到这样几个问题:当运行数据集算法而导致崩溃时如何解决?当需要处理大容量数据文件时该如何加载?如何方便快捷的解决内存不足的问题? 针对以上问题,蜡炬教育老师给出7个建议: 1.分配更多内存某些ML工具或数据库默认内存配置不合理,可以看看是否可以手动分配. 2.使用较小样本确认是否需要处理所有数据?在对最终数据拟合前,使用随机抽取数据样本这个示例即可. 3.

b/s和C/S方法用C#递归方法把数据表加载到treeview控件中

先看一下数据库的结构: 表结构如下所示: Num                   Name                                 fatherNum       BZ 01                      总节点                                      0              ...... 0101                   第一个一级节点                          01      

GreenPlum 使用gpload通过gpfdist文件实现数据高速加载

1,gpload环境准备 环境准备请参考博主以前的文章gpfdist部署实战:http://blog.csdn.net/mchdba/article/details/72540806  ,安装好gpfdist后,gpload也自动有了,可以自动使用. 安装完后,可以启动gpfdist服务:nohup /data/greenplum/bin/gpfdist -d /data/greenplum/ -p 8090> /home/gpadmin/gpfdist.log  & 2,gpload简介

smartGWT DataSource数据动态加载

昨天和今天早上,用DataSource从数据库后台动态加载数据,我的业务是这样的: 我有两个SelectItem选择框,第一个选择框里面的数据是单位,第二个选择框中的数据是对应单位的人,因为人可能有重名,所以还需要加载对应的人的其他信息(如身份证等等),来个图: (选择好之后的样子) 第一个选择框中的单位变了,第二的选择框中只加载对应单位的人的数据,但是关键问题出来了,我加载的数据每一次都不会改变,不管第一个选择框的选择的是什么,第二个选择框只能加载一次数据,就是初始化那一次,我尝试了将对应的D

数据正在加载 关于 遮罩层的添加

在做项目过程中,发现 如果数据正在加载,点击页面进行别的操作 可能出错(例如 tab 页切换时,操作太快,数据填入了错的tab页 ,当然这种情况是指数据不随tab页的点击而加载) 解决:1.想到点击一个tab页禁止 其它tab的可点击性 如为button (表单类 disabled=true) 在登录页面中 常用此效果来处理 防止用户填完数据后 多次点击登录 进行数据库访问 (用户提交一次后 将登录按钮设为不可点 请求返回后在放开) 如为a 标签  需要三步  1. 在onclick事件 ret

JS实现-页面数据无限加载

在手机端浏览网页时,经常使用一个功能,当我们浏览京东或者淘宝时,页面滑动到底部,我们看到数据自动加载到列表.之前并不知道这些功能是怎么实现的,于是自己在PC浏览器上模拟实现这样的功能.先看看浏览效果: 当滚动条滚动到页面底部时,提示“正在加载…”. 当页面已经加载了所有数据后,滚动到页面底部会提示“数据已加载到底了”: 实现数据无限加载的过程大致如下: 1.滚动条滚动到页面底部. 2.触发ajax加载,把请求返回的数据加载到列表后面. 如何判断滚动条是否滚动到页面底部?我们可以设置一个规则:当滚

ArcGIS Engine中数据的加载 (转)

1.加载Shapefile数据 1 IWorkspaceFactory pWorkspaceFactory; 2 IFeatureWorkspace pFeatureWorkspace; 3 IFeatureLayer pFeatureLayer; 4 5 //获取当前路径和文件名 6 OpenFileDialog dlg = new OpenFileDialog(); 7 dlg.Filter = "Shape(*.shp)|*.shp|All Files(*.*)|*.*"; 8

WinForm ListView虚拟模式加载数据 提高加载速度

将VirtualMode 属性设置为 true 会将 ListView 置于虚拟模式.控件不再使用Collection.Add()这种方式来添加数据,取而代之的是使用RetrieveVirtualItem(Occurs when the ListView is in virtual mode and requires a ListViewItem.)和CacheVirtualItems两个事件,单独使用RetrieveVirtualItem也可以,CacheVirtualItems这个事件主要是

省市数据递归加载到TreeView

using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Windows.Forms; using System.Data.SqlClient; namespace 省市数据递归加载到TreeView { public