对数据库HBASE的操作有shell端和java API两种方式。
在此之前要先说一下HBASE的结构及其数据存储结构:
HBASE是基于HDFS的,是一种NoSQL的数据库。它的数据模型如下所示:
Row Key | Timestamp | Column Family | |
URI | Parser | ||
r1 | t3 | url=http://www.taobao.com | title=天天特价 |
t2 | host=taobao.com | ||
t1 | |||
r2 | t5 | url=http://www.alibaba.com | content=每天… |
t4 | host=alibaba.com |
RowKey:行键,也是表的主键。表中的记录是按照Row Key排序。
Timestamp:时间戳,每次数据操作对应的时间戳,可以看作数据的version number。
ColumnFamily:列簇,Table在水平方向有一个或者多个Column Family组成,一个Column Family中可以由任意多个Column组成,即Column Family支持动态扩展,无需预先定义Column的数量以及类型,所有Column均以二进制格式存储,用户需要自行进行类型转换。
- shell操作
shell中输入 hbase shell进入hbase操作,假设这样的person表:
rowid,username,userid,birth,phone,sex
创建该表需要使用:
create ‘person‘,‘username‘,‘userid‘,‘birth‘,‘phone‘,‘sex‘
第一个是表名,后面都是属性,rowkey列不需要自己创建和设定列名。
ps,hbase shell 中,都没有分号做结尾。而且表名列名基本都加引号,这和SQL不一样。
加入单个数据,我们使用put命令。注意put命令的操作,只能一行一列的单个值去添加,不能一次加一行:
put ‘person‘,‘1‘,‘username‘,‘Aran‘
表示person表中行号(row key)为1列名为username的值为Aran。
如果清空表,需要:
truncate ‘person‘
如果删除表,需要:
disable ‘person‘
drop ‘person‘
Hbase中的查询分为两种:以rowkey方式的行查询,和以值为基础遍历.
rowkey方式:
get ‘person‘,‘1‘
值查询:
scan ‘person‘,FILTER=>"ValueFilter(=,‘binary:1992-2-12‘)"
Hbase支持cvs格式批量导入:
类似这样格式的文件 1,User1,8237764069450,2001-8-19,682318616,1
可以在shell中(不是hbase shell中)导入:
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv-Dimporttsv.separator=","-Dimporttsv.columns=HBASE_ROW_KEY,username,userid,birth,phone,sex person hdfs://192.168.70.28:9000/dataImport/HbaseTable-1.cvs
分割符为,并且,导入不用写rowkey,默认第一个就是rowkey。
就介绍这么多。详细的请参考:
http://www.jb51.net/article/31172.htm
http://blog.csdn.net/pirateleo/article/details/7956965
http://www.cnblogs.com/linjiqin/archive/2013/03/08/2949339.html
- JAVA API
对于开发者而言,通过shell去操作数据还是很少见。我们可以使用JAVA API。
新建Java项目,添加的包有Hadoop下的Hadoop-core和Hbase/lib/下的所有jar。
只写了查询相关的Func,直接上代码:
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
publicclassHbaseAPI{
privatestaticConfiguration conf =null;
static{
conf =HBaseConfiguration.create();
conf.set("hbase.zookeeper.property.clientPort","2181");
conf.set("hbase.zookeeper.quorum","192.168.70.29");
conf.set("hbase.master","192.168.70.28:60000");
//timeout time
conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,1200000);
}
//query by rowkey
publicstaticvoidQueryByrowKey(String tableName,String rowKey){
try{
HTable table =newHTable(conf,tableName);
Get g =newGet(rowKey.getBytes());
long start =System.currentTimeMillis();
Result r = table.get(g);
/*System.out.println("column:rowkey"
+ "====value:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println("column:" + new String(keyValue.getFamily())
+ "====value:" + new String(keyValue.getValue()));
}*/
long end = System.currentTimeMillis();
System.out.println(end-start);
}catch(IOException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
//query by condition
publicstaticvoidQueryByCondition(String tableName,String columnName,String columnValue){
try{
HTable table =newHTable(conf,tableName);
Filter filter =newSingleColumnValueFilter(columnName.getBytes(),null,CompareOp.EQUAL,columnValue.getBytes());
Scan s =newScan();
s.setFilter(filter);
long start =System.currentTimeMillis();
ResultScanner rs = table.getScanner(s);
int hang =0;
for(Result r : rs){
/*System.out.println(new String(r.getRow()));
System.out.println("column:rowkey"
+ "====value:" + new String(r.getRow()));
for (KeyValue keyValue : r.raw()) {
System.out.println("column:" + new String(keyValue.getFamily())
+ "====value:" + new String(keyValue.getValue()));
}*/
hang++;
}
long end =System.currentTimeMillis();
System.out.println("rownum:"+hang);
System.out.println(end-start);
}catch(IOException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
publicstaticvoid main(String[] args){
//HbaseAPI.QueryByrowKey("person1000", "12307999");
//HbaseAPI.QueryByCondition("person1000", "username", "User2312397");
HbaseAPI.QueryByCondition("person1000","sex","0");
}
}
特别说下conf.setLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY, 1200000)表示的是每次访问最大时长,超过这个时长自动终止。
上文中的QueryByCondition并不准确,SingleColumnValueFilter的四个参数分别是列簇名、列名、过滤条件,过滤值。由于上文中的Hbase表每个列都是列簇,所以没有问题,现提供同时有列簇和列的查询:
publicstaticvoidQueryByCondition(String tableName,String familyName,String columnName,String columnValue){
try{
HTable table =newHTable(conf,tableName);
Filter filter =newSingleColumnValueFilter(familyName.getBytes(),columnName.getBytes(),CompareOp.EQUAL,columnValue.getBytes());
Scan s =newScan();
s.setCaching(1000);
s.setFilter(filter);
long start =System.currentTimeMillis();
ResultScanner rs = table.getScanner(s);
int hang =0;
for(Result r : rs){
System.out.println(newString(r.getRow()));
System.out.println("column:rowkey"
+"====value:"+newString(r.getRow()));
for(KeyValue keyValue : r.raw()){
System.out.println("column:"+newString(keyValue.getFamily())+":"+newString(keyValue.getQualifier())
+"====value:"+newString(keyValue.getValue()));
}
hang++;
}
long end =System.currentTimeMillis();
System.out.println("rownum:"+hang);
System.out.println(end-start);
}catch(IOException e){
// TODO Auto-generated catch block
e.printStackTrace();
}
}
代码中有一个s.setCaching(),这涉及到了Hbase 的Scanner Caching功能:
base.client.scanner.caching配置项可以设置HBase scanner一次从服务端抓取的数据条数,默认情况下一次一条。通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销,代价是scanner需要通过客户端的内存来维持这些被cache的行记录。
有三个地方可以进行配置:1)在HBase的conf配置文件中进行配置;2)通过调用HTable.setScannerCaching(int scannerCaching)进行配置;3)通过调用Scan.setCaching(int caching)进行配置。三者的优先级越来越高。
参考文件:
http://javacrazyer.iteye.com/blog/1186881