实现原理:
1、读取hbase数据每页的数据时多取一条数据。如:分页是10条一页,第一次查询hbase时, 取10+1条数据,然后把第一条和最后一条rowkey数据保存在redis中,redis中的key为用户的token+URL。即token.set(token+url:list<String>);
2、前台点击下页时,查询当前页(currentPagae)在redis的list是否存在list.get(currentPage)的rowkey。如果存在,则以之前为startRowKey,取10+1条,并把最后一条保存在redis中。不存在则查询出错,提示重新查询,理论上不会出现,除非redis挂了。
3、如果查询的数据的startRowKey和stopRowKey在token中都能找到。则只需要查询这个范围数据即可。
3、什么时候清除这些redis数据呢?第一、设置redis有效期。第二,这条很重要,是保证前三条数据准确性的前提。在用户点击非下一页上一页按钮的操作时,都清除redis中的当前用户的数据。
4、即然有分页,那当然有数据量统计count,这个我使用hbase的协处理器coprocessor。当然每次查询count也费时间。当第一次查询时,把count保存在用户的redis中。redis的清除还是第(3)步的过程。
5、能这么做还有一个很重要的前提:前端界面的分页,只提供了两个按钮:上一页和下一页。这是保证这个方案可行性的基础。
下面上代码:
controller中方法的代码,这里的基本框架使用的是renren快速开发套件
@RequestMapping("/list") public R list(@RequestParam Map<String, Object> params,HttpServletRequest httpRequest){ long time = System.currentTimeMillis(); HBasePage page= null; try{ String token = httpRequest.getHeader("token"); //如需要额外的过滤器,请自己定义并使用.buildFilter(filter)方法添加到query中 HbaseQuery query = new HbaseQuery(params) .buildScanCount().buildPageRowKey(token) .finish(); page = service.query(query).buildRedisRowKey(token); }catch (Exception e){ e.printStackTrace(); } long time2 = System.currentTimeMillis(); System.out.println("time2-time==list="+(time2-time)); return R.ok().put("page",page); }
处理查询参数的类HBaseQuery,因为业务原因,所有的hbase查询有两个必须的条件:开始日期和结束日期,所以我在HBaseQuery中把这两个参数直接封装了。
public class HbaseQuery { private static final long serialVersionUID = 1L; //当前页码 private int page; //每页条数 private long limit; private Map<String, Object> params; private List<String> pageStartRowKeys; private Date startDate; private Date endDate; private Scan countScan; private Scan dataScan= new Scan(); private int cache = 10; private DateFormat sf =new SimpleDateFormat("yyyy-MM-dd"); private FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL); private FilterList countfl = new FilterList(FilterList.Operator.MUST_PASS_ALL); public HbaseQuery(Map<String, Object> params)throws Exception{ this.params = params; String temp2 = (String)params.get("startDate"); startDate = sf.parse(temp2); String temp = (String)params.get("endDate"); endDate = sf.parse(temp); endDate = DateUtils.addDays(endDate,1); this.page = Integer.parseInt(params.get("page").toString()); this.limit = Integer.parseInt(params.get("limit").toString()); cache = limit>5000?5000:((int)limit+1);//加1,因为每次都会取limit+1条数据 params.remove("startDate"); params.remove("endDate"); params.remove("page"); params.remove("limit"); } public HbaseQuery buildScanCount()throws Exception{ countScan= new Scan(); countScan.setMaxVersions(); countScan.setCaching(5); Long startLong = Long.MAX_VALUE-startDate.getTime(); countScan.setStopRow((startLong+"-").getBytes()); Long endLong = Long.MAX_VALUE-(endDate.getTime()-1); countScan.setStartRow((endLong+"-").getBytes()); return this; } public HbaseQuery buildPageRowKey(String token)throws Exception{ dataScan.setMaxVersions(); Long startLong = Long.MAX_VALUE-startDate.getTime(); dataScan.setStopRow((startLong+"-").getBytes()); Long endLong = Long.MAX_VALUE-(endDate.getTime()-1); dataScan.setStartRow((endLong+"-").getBytes()); RedisUtils redisUtils = (RedisUtils)SpringContextUtils.getBean("redisUtils"); List<String> pageStartRowKeys = redisUtils.get(token,List.class); //点击上一页或下一页 if(params.get("pageicon")!=null&&!((String)params.get("pageicon")).equals("")){ //且redis中的startRowKeys不为空 String pageicon = (String)params.get("pageicon"); if(pageStartRowKeys!=null){ String startRowKey = pageStartRowKeys.get(this.page-1); if(pageicon.equals("next")&&pageStartRowKeys.size()==this.page){ dataScan.setStartRow(startRowKey.getBytes()); Filter pageFilter=new PageFilter(cache); fl.addFilter(pageFilter); }else if((pageicon.equals("next")&&pageStartRowKeys.size()>this.page) ||pageicon.equals("prev")){ String stopRowKey = pageStartRowKeys.get(this.page); dataScan.setStartRow(startRowKey.getBytes()); dataScan.setStopRow(stopRowKey.getBytes()); Filter pageFilter=new PageFilter(this.getLimit()); fl.addFilter(pageFilter); } }else{ throw new Exception("点的是分页,但是redis中没有数据,这程序肯定有问题"); } }else{//点击的非分页按钮,则删除redis中分页信息 redisUtils.delete(token); Filter pageFilter=new PageFilter(this.getLimit()+1); fl.addFilter(pageFilter); } dataScan.setCaching(cache); return this; } public HbaseQuery buildDataFilter(Filter filter){ fl.addFilter(filter); return this; } public HbaseQuery buildCountFilter(Filter filter){ fl.addFilter(filter); return this; } public HbaseQuery finish(){ countScan.setFilter(countfl); dataScan.setFilter(fl); return this; } }
查询hbase的方法。注意:在使用hbase的协处理器前,请先确保表开通了此功能。
hbase表开通协处理功能方法(shell命令):
(1)disable指定表。hbase> disable ‘mytable‘
(2)添加aggregation hbase> alter ‘mytable‘, METHOD => ‘table_att‘,‘coprocessor‘=>‘|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||‘
(3)重启指定表 hbase> enable ‘mytable‘
public HBasePage query(HbaseQuery query) { long time = System.currentTimeMillis(); Map<String,String> communtiyKeysMap = new HashMap<>(); HBasePage page = new HBasePage(query.getLimit(),query.getPage()); final String tableName = this.getTableName(); if(query.getCountScan()!=null){ AggregationClient ac = new AggregationClient(hbaseTemplate.getConfiguration()); try{ long count = ac.rowCount(TableName.valueOf(tableName),new LongColumnInterpreter(),query.getCountScan()); page.setTotalCount(count); }catch (Throwable e){ e.printStackTrace(); } } long time2 = System.currentTimeMillis(); List rows = hbaseTemplate.find(tableName, query.getDataScan(), new RowMapper<Object>() { @Override public Object mapRow(Result result, int i) throws Exception { Class clazz = ReflectMap.get(tableName);//这里做了表名和实体Bean的映射。 if(i==0){ communtiyKeysMap.put("curPageStart", new String(result.getRow())); } if(i==query.getLimit()){ communtiyKeysMap.put("nextPageStart", new String(result.getRow())); } HBaseResultBuilder hrb = new HBaseResultBuilder<Object>("sf", result, clazz); return hrb.buildAll().fetch(); } }); // if(rows.size()>0&&page.getPageSize()<rows.size()){ rows.remove(rows.size()-1); } page.setList(rows); page.setNextPageRow(communtiyKeysMap.get("nextPageStart")); page.setCurPageRow(communtiyKeysMap.get("curPageStart")); long time3 = System.currentTimeMillis(); System.out.println("time2-time==getCount="+(time2-time)); System.out.println("time3-time2==getData="+(time3-time2)); return page; }
/分页类的代码HbasePage
public class HBasePage implements Serializable { private static final long serialVersionUID = 1L; //总记录数 protected long totalCount; //每页记录数 protected long pageSize; //总页数 protected int totalPage; //当前页数 protected int currPage; //列表数据 protected List<Object> list; private String nextPageRow;//下一页的ROWKEY private String curPageRow;//当前页的开始ROWKEY /** * 分页 * @param pageSize 每页记录数 * @param currPage 当前页数 */ public HBasePage(long pageSize, int currPage) { this.list = list; this.totalCount = totalCount; this.pageSize = pageSize; this.currPage = currPage; } public void setTotalCount(long totalCount) { this.totalCount = totalCount; this.totalPage = (int)Math.ceil((double)totalCount/pageSize); } public HBasePage buildRedisRowKey(String token){ RedisUtils redisUtils = (RedisUtils)SpringContextUtils.getBean("redisUtils"); List<String> pageStartRowKeys = redisUtils.get(token,List.class); List<String> pageRowKeys = redisUtils.get(token,List.class); if(this.getList().size()>0){ if(pageRowKeys==null||pageRowKeys.size()<=0){ pageRowKeys = new ArrayList<>(); pageRowKeys.add(this.getCurPageRow().substring(0,this.getCurPageRow().indexOf("-")+1)); pageRowKeys.add(this.getNextPageRow().substring(0,this.getNextPageRow().indexOf("-")+1)); redisUtils.set(token,pageRowKeys); }else{ if(pageRowKeys.size()>this.getCurrPage()){ //doNothing }else if(pageRowKeys.size()==this.getCurrPage()){ pageRowKeys.add(this.getNextPageRow().substring(0,this.getNextPageRow().indexOf("-")+1)); redisUtils.set(token,pageRowKeys); } } } return this; } }
注意:
1、我的rowKey设置规则是 (Long_Max-new Date().getTime()+"-"+id),所以在看startRowKey和stopRowKey时特别注意。
如有什么更好的办法或代码缺陷,欢迎留言探讨。