public class HBaseConn {
private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;
...
}
建立连接:
public HBaseConn(String rootDir, String zkServer, String port) throws IOException{
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;
conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", rootDir);
conf.set("hbase.zookeeper.quorum", zkServer);
conf.set("hbase.zookeeper.property.clientPort", port);
hConn = HConnectionManager.createConnection(conf);
}
String rootDir = "hdfs://itcast04:9000/hbase";
String zkServer = "itcast04";
String port = "2181";
HBaseConn conn = new HBaseConn(rootDir, zkServer, port);
建表:
public void createTable(String tableName, List<String> cols) throws MasterNotRunningException, ZooKeeperConnectionException, IOException{
HBaseAdmin admin = new HBaseAdmin(conf);
if(admin.tableExists(tableName)){
throw new IOException("table exists");
}else{
HTableDescriptor tableDesc = new HTableDescriptor(tableName);
for(String col : cols){
HColumnDescriptor colDesc = new HColumnDescriptor(col);
//采用压缩;
colDesc.setCompressionType(Algorithm.GZ);
colDesc.setDataBlockEncoding(DataBlockEncoding.DIFF);
tableDesc.addFamily(colDesc);
}
admin.createTable(tableDesc);
}
}
/*List<String> cols = new LinkedList<String>();
cols.add("basicInfo");
cols.add("moreInfo");
conn.createTable("student", cols);*/
增加记录(put)(RowKey+列族+列标识+Cell):
public void saveData(String tableName, List<Put> puts) throws IOException{
HTableInterface table = hConn.getTable(tableName);
table.put(puts);
table.setAutoFlush(false);
//提高IO吞吐率
table.flushCommits();
}
/*List<Put> puts = new LinkedList<Put>();
//RowKey
Put put1 = new Put(Bytes.toBytes("Tom"));
put1.add(Bytes.toBytes("basicInfo"), Bytes.toBytes("age"), Bytes.toBytes("27"));
put1.add(Bytes.toBytes("moreInfo"), Bytes.toBytes("tel"), Bytes.toBytes("13846677467"));
puts.add(put1);
conn.saveData("student", puts);*/
查表(返回Get):
public Result getData(String tableName, String rowKey) throws IOException{
HTableInterface table = hConn.getTable(tableName);
Get get = new Get(Bytes.toBytes(rowKey));
return table.get(get);
}
将查询到的Get中的KV输出:
public void format(Result result){
String rowKey = Bytes.toString(result.getRow());
KeyValue[] kvs = result.raw();
for(KeyValue kv : kvs){
System.out.println("Column Family: " + Bytes.toString(kv.getFamily()));
System.out.println("Column Name: " + Bytes.toString(kv.getQualifier()));
System.out.println("Cell Value: " + Bytes.toString(kv.getValue()));
}
}
// conn.format(conn.getData("student", "Tom"));
Scan表,利用format输出:
public void hbaseScan(String tableName) throws Exception{
Scan scan = new Scan();
scan.setCaching(1000);
HTableInterface table = hConn.getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
for(Result res : scanner){
format(res);
}
}
简单过滤器:
public void filterTest(String tableName) throws Exception{
Scan scan = new Scan();
scan.setCaching(1000);
// RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("Tom")));
RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator("T\\w+"));
scan.setFilter(filter);
HTableInterface table = hConn.getTable(tableName);
ResultScanner scanner = table.getScanner(scan);
for(Result res : scanner){
format(res);
}
}
分页过滤器:
public void pageFilterTest(String tableName) throws IOException{
PageFilter filter = new PageFilter(4);
byte[] lastRow = null;
int pageCount = 0;
HTableInterface table = hConn.getTable(tableName);
while(++pageCount > 0){
System.out.println("Page Count:" + pageCount);
Scan scan = new Scan();
scan.setFilter(filter);
if(lastRow != null){
scan.setStartRow(lastRow);
}
ResultScanner scanner = table.getScanner(scan);
int count = 0;
for(Result res : scanner){
lastRow = res.getRow();
if(++count > 3)
break;
format(res);
}
if(count < 3){
break;
}
}
}
附件列表
时间: 2024-12-20 15:42:29