基于HBase的冠字号查询系统2--实现部分

1. 软件版本和部署

maven:3.3.9,jdk:1.7 ,Struts2:2.3.24.1,hibernate:4.3.6,spring:4.2.5,MySQL:5.1.34,Junit:4,Myeclipse:2014;

Hadoop2.6.4,HBase1.1.2

源码下载:https://github.com/fansy1990/ssh_v3/releases

部署参考:http://blog.csdn.net/fansy1990/article/details/51356583

数据下载:http://download.csdn.net/detail/fansy1990/9540865 或  http://pan.baidu.com/s/1dEVeJz7

请先参考上篇Blog: 基于HBase的冠字号查询系统1--理论部分 ;

2. 系统功能&核心实现

2.1 系统首页

冠字号查询系统,主要包括两方面功能:

1. 把原始数据通过MR流程从HDFS导入到HBase,提供通用接口;

2. 提供冠字号查询功能、提供存款、取款功能;

2.2 数据加载功能

数据加载功能是一个通用的HBase表数据导入功能;

用户只需提供原始数据路径(HDFS)、HBase表名(该表需要先存在)、列描述(参考前篇博客此参数解释)、字段分隔符、时间格式即可;

2.2.1 后台实现

public void submitJob() {
		Map<String, Object> jsonMap = new HashMap<String, Object>();
		if (HadoopUtils.getMrLock().equals(MRLock.NOTLOCKED)) {// 没有锁,则可以提交代码
			// 先加锁
			HadoopUtils.setMrLock(MRLock.LOCKED);
			// 清空MR任务缓存
			HadoopUtils.initMRCache();

			// 提交任务
			new Thread(new Hdfs2HBaseRunnable(hdfsFile, tableName,
					colDescription, splitter, dateFormat)).start();

			jsonMap.put("flag", "true");
			jsonMap.put("jobId", HadoopUtils.getJobId());
		} else {
			jsonMap.put("flag", "false");
			jsonMap.put("msg", "已经在运行MR程序,请确认!");
		}
		Utils.write2PrintWriter(JSON.toJSONString(jsonMap));
		return;
	}

这里提供一个MRLock,加此锁是防止在提交任务后,任务正在运行,而有其他程序重复提交任务(监控会有问题);

同时,这里使用多线程,提交任务后,立即返回前台,前台接收返回的信息后,根据判断,是否弹窗监控任务进度:

ret = callByAJax("hadoop/hadoop_submitJob.action",
			{hdfsFile:hdfs,tableName:table,colDescription:colDescription,splitter:splitter,dateFormat:dateFormat})
	if(ret.flag=="false"){// 提交任务失败
		$.messager.alert(‘提示‘,ret.msg,‘warn‘);
		return ;
	}
	 $.messager.progress({
	     title:‘提示‘,
	     msg:‘导入数据中...‘,
	     interval:0    //disable auto update progress value
	 });
	// hadoop_submitJob.action 返回的ret中包含jobId , ret.jobId
	if(typeof(EventSource)!=="undefined")
	  {
		console.info("jobId:"+ret.jobId);
	  var source=new EventSource("hadoop/hadoop_getMRProgress.action"+"?jobId="+ ret.jobId );
	  source.onmessage=function(event)
	    {
		  console.info(event.data);

		  // TODO 判断event.data indexOf error ,解析:后面的值,显示,同时提示任务错误
		  if(event.data.indexOf( "error")> -1){
			  source.close();
			  $.messager.progress(‘close‘);
			  $.messager.alert(‘提示‘,"任务运行失败!",‘warn‘);
		  }
		  // TODO 判断 event.data 为success ,则提示任务成功, 其他清空则任务进度即可
		  if(event.data == "success"){
			  source.close();
			  $.messager.progress(‘close‘);
			  $.messager.alert(‘提示‘,"任务运行成功!",‘warn‘);
		  }
		  var bar = $.messager.progress(‘bar‘);
		  bar.progressbar(‘setValue‘,  event.data);

	    };

注意这里的JobId的获取:

1) 在提交任务的时候把job变量设置到外部静态类中;

2)在Thread线程提交任务后,去获取jobId

需要注意的是,jobId只有在任务运行中才会被初始化,所以在提交任务后(thread运行中才初始化jobid);

3)while循环获取jobid:

public static String getJobId() {
		long start = System.currentTimeMillis();
		while (noJobId()) {
			try {
				Thread.sleep(200);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			log.info(" Getting job id ...");
		}
		long end = System.currentTimeMillis();

		log.info("获取jobId,耗时:" + (end - start) * 1.0 / 1000 + "s");
		return currJob.getJobID().toString();
	}

private static boolean noJobId() {
		if (currJob == null || currJob.getJobID() == null)
			return true;

		return false;
	}

同时,这里使用了HTML5的服务器发送事件,关于此技术请参考:http://www.w3school.com.cn/html5/html_5_serversentevents.asp ;

2.2.2 实例

提交任务后,实时反映任务运行进度:

后台日志:

运行成功:

2.3. 查询冠字号

这里查询冠字号包含两个部分:

1)随机生成:是指随机生成冠字号(根据个数可以生成不同个数冠字号,逗号分隔);

2) 查询,根据冠字号进行查询,如果hbase表中该记录的exist字段为1则说明存在,否则说明查询改冠字号为取出状态,则对应为疑似伪钞冠字号;

3) 详细查询:查询冠字号的所有信息,可以查询多个版本信息:

2.3.1后台实现

/**
	 * 检查给定的冠字号是否存在疑似伪钞冠字号
	 *
	 * @param stumbers
	 * @return
	 * @throws IllegalArgumentException
	 * @throws IOException
	 */
	public Map<String, String> checkStumbersExist(String stumbers)
			throws IllegalArgumentException, IOException {
		String[] stumbersArr = StringUtils.split(stumbers, Utils.COMMA);
		Connection connection = HadoopUtils.getHBaseConnection();
		Table table = connection.getTable(TableName
				.valueOf(Utils.IDENTIFY_RMB_RECORDS));
		Map<String, String> map = new HashMap<>();
		Get get = null;
		try {
			List<Get> gets = new ArrayList<>();
			for (String stumber : stumbersArr) {
				get = new Get(stumber.trim().getBytes());
				gets.add(get);
			}
			Result[] results = table.get(gets);
			String exist;
			StringBuffer existStr = new StringBuffer();
			StringBuffer notExistStr = new StringBuffer();
			for (int i = 0; i < results.length; i++) {
				exist = new String(results[i].getValue(Utils.FAMILY,
						Utils.COL_EXIST));
				if ("1".equals(exist)) {
					existStr.append(stumbersArr[i]).append(Utils.COMMA);
				} else if ("0".equals(exist)) {
					notExistStr.append(stumbersArr[i]).append(Utils.COMMA);
				} else {
					log.info("冠字号:" + stumbersArr[i] + "值 exist字段值异常!");
				}
			}
			if (existStr.length() > 0) {
				map.put("exist", existStr.substring(0, existStr.length() - 1));
			} else {
				map.put("exist", "nodata");
			}
			if (notExistStr.length() > 0) {
				map.put("notExist",
						notExistStr.substring(0, notExistStr.length() - 1));
			} else {
				map.put("notExist", "nodata");
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
		return map;
	}

直接使用HBase的Table Java API实现即可;

获取给定rowkey以及版本数的记录,同样使用HBase 的Table Java API 即可实现

/**
 * 根据rowkey和版本个数查询数据
 * @param tableName
 * @param cfs
 * @param rowkeys
 * @param versions
 * @return
 * @throws IOException
 */
	public List<HBaseTableData> getTableCertainRowKeyData(String tableName,
			String cfs, String rowkeys, int versions) throws IOException {
		String[] stumbersArr = StringUtils.split(rowkeys, Utils.COMMA);
		Connection connection = HadoopUtils.getHBaseConnection();
		Table table = connection.getTable(TableName
				.valueOf(tableName));
		List<HBaseTableData> list = new ArrayList<>();
		Get get = null;
		try {
			List<Get> gets = new ArrayList<>();
			for (String stumber : stumbersArr) {
				get = new Get(stumber.trim().getBytes());
				get.setMaxVersions(versions);
				gets.add(get);
			}
			Result[] results = table.get(gets);
			Cell[] cells;
			for (int i = 0; i < results.length; i++) {
				cells = results[i].rawCells();

				list.addAll(getHBaseTableDataListFromCells(cells));

			}

			return list;

		} catch (Exception e) {
			e.printStackTrace();
		}

		return null;
	}

2.4 存款:

1)存款需要输入用户ID、银行、冠字号,当然也可以随机生成;

2)存取使用的是Table的checkAndPut 函数,关于此函数存储数据的一致性,参考:http://blog.csdn.net/fansy1990/article/details/51451583

由于AAAR5912的冠字号,其exist状态为1,说明HBase表中此冠字号为存储状态,不能再次存储,即发现了疑似伪钞的冠字号;

2.5 取款

1)取款同样有随机生成功能,类似上面:

当然,这里随机生成的只是用户和银行而已;

2) 取款:取款根据取款金额进行获取:

取款流程如下:

1)        根据给定的取款冠字号个数num,随机查找冠字号(rowkey)对应的op_www:exist字段值为1的num*3条记录;

2)        使用HBase.checkAndPut进行更新,把op_www:exist字段值更新为0,并返回更新后的rowkey,即冠字号;

3)        如果在num*3条记录更新后,被更新的冠字号不足num条,则再次随机查找冠字号对应的op_www:exist字段值为1的记录,并更新,返回更新后的冠字号,直到返回的冠字号个数为num;

2.6 验证每秒500+查询

使用单个线程进行查询:

package stumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class ReadTest {

//	private static String FAMILY ="info"; 

	public static void main(String[] args) throws IOException {
		long size =10000;
		get(Utils.getConn(),Utils.generateRowKey(size));
	}

	public static void get(Connection connection,List<byte[]> rowkeys) throws IOException {
		System.out.println(new Date()+":开始读取记录...");
		long start =System.currentTimeMillis();
		Table table = connection.getTable(TableName.valueOf(Utils.TABLE));
		Get get = null ;
		long count =0;
		try{
			for(byte[] rowkey :rowkeys){
				count ++;
	//			get = new Get(Bytes.toBytes(""));
				get = new Get(rowkey);
				table.get(get);
				if(count%1000==0){
					System.out.println("count:"+count);
				}
			}
		long end = System.currentTimeMillis();
		System.out.println(new Date()+":"+rowkeys.size()+"条记录,读取耗时:"+(end-start)*1.0/1000+"s");
		}catch(Exception e){

		}finally{
			table.close();
		}

	}

}

使用多线程查询:

package stumer;

import java.util.Date;
import java.util.List;

import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Table;

public class ReadThread implements Runnable {

	private List<byte[]> rks;
	private Table table;
	public  ReadThread(Table table ,List<byte[]> rks) {
		this.table = table;
		this.rks = rks;
	}
	@Override
	public void run() {
		System.out.println(Thread.currentThread().getName()+" "+new Date()+":开始读取记录...");
		long start =System.currentTimeMillis();
		Get get = null ;
		long count =0;
		try{
			for(byte[] rowkey :rks){
				count ++;
	//			get = new Get(Bytes.toBytes(""));
				get = new Get(rowkey);
				table.get(get);
				if(count%1000==0){
					System.out.println(Thread.currentThread().getName()+" count:"+count);
				}
			}
		long end = System.currentTimeMillis();
		System.out.println(Thread.currentThread().getName()+" "+new Date()
				+":"+rks.size()+"条记录,读取耗时:"+(end-start)*1.0/1000+"s");
		}catch(Exception e){

		}

	}

}

多线程查询主程序:

package stumer;

import java.io.IOException;

public class ReadThreadTest {

	public static void main(String[] args) throws IOException {
		long dataSize =500;
		int threadSize = 20;
		for(int i=0;i<threadSize;i++){
			new Thread(new ReadThread(Utils.getTable(), Utils.generateRowKey(dataSize))).start();
		}
	}
}

工程类Utils程序

package stumer;

import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Random;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class Utils {
	public static String TABLE = "records";
	private static DecimalFormat df = new DecimalFormat( "0000" );
	public static String[]  crownSizePrefixes =null;
	static Random random = new Random();

	static {
		crownSizePrefixes = new String[26*2];
		for (int i = 0; i < crownSizePrefixes.length/2; i++) {
			crownSizePrefixes[i] = "AAA" + (char) (65 + i);
			crownSizePrefixes[i+26] = "AAB" + (char) (65 + i);
		}
	}
	/**
	 * 把0~9999 转为 0000~9999
	 * @param num
	 * @return
	 */
	public static String formatCrownSizeSuffix(int num){
		return df.format(num);
	}
	public static Table getTable() throws IOException{
		return getConn().getTable(TableName.valueOf(TABLE));
	}
	public static String getRandomCrownSize(){
		return crownSizePrefixes[random.nextInt(crownSizePrefixes.length)]
				+formatCrownSizeSuffix(random.nextInt(10000));
	}
	public static Connection getConn() throws IOException {
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.master", "node2:16000");// 指定HMaster
		conf.set("hbase.rootdir", "hdfs://node1:8020/hbase");// 指定HBase在HDFS上存储路径
		conf.set("hbase.zookeeper.quorum", "node2,node3,node4");// 指定使用的Zookeeper集群
		conf.set("hbase.zookeeper.property.clientPort", "2181");// 指定使用Zookeeper集群的端口

		Connection connection = ConnectionFactory.createConnection(conf);// 获取连
		return connection;
	}

	public static List<byte[]> generateRowKey(long size){
		System.out.println(new Date()+"开始生成"+size +"条记录...");
		long start =System.currentTimeMillis();
		List<byte[]>  rowkeys = new ArrayList<>();

		for(int i=0;i<size;i++){
			rowkeys.add(Bytes.toBytes(Utils.getRandomCrownSize()));
		}
		long end =System.currentTimeMillis();
		System.out.println(new Date()+":"+rowkeys.size()+"条记录,生成耗时:"+(end-start)*1.0/1000+"s");
		return rowkeys;
	}
}

3. 总结

1) 基于冠字号查询系统基于已存在HBase的冠字号非伪钞,如果已存在冠字号包含伪钞,则存储和取钱功能都会有问题;

2) 原始数据(用户信息、冠字号交易信息),在一定程序上是有规律的,并且对于大数据来说,还是小数据,需要在较大数据集上测试;

3)用户账号相关信息(存储的钱总数等)并没有在该系统中体现,后续可以考虑;

4) 查询冠字号、存、取款功能在第一次点击的时候初始化时间较长,考虑弹窗显示;

5)查询冠字号、存、取款功能中的详细查询可以在定制点,比如可以只查询出某个列簇或列的数据即可;

分享,成长,快乐

脚踏实地,专注

转载请注明blog地址:http://blog.csdn.net/fansy1990

时间: 2024-10-11 00:32:21

基于HBase的冠字号查询系统2--实现部分的相关文章

基于HBase的冠字号查询系统1--理论部分

1. 软件版本和部署 maven:3.3.9,jdk:1.7 ,Struts2:2.3.24.1,hibernate:4.3.6,spring:4.2.5,MySQL:5.1.34,Junit:4,Myeclipse:2014: Hadoop2.6.4,HBase1.1.2 源码下载:https://github.com/fansy1990/ssh_v3/releases 部署参考:http://blog.csdn.net/fansy1990/article/details/51356583 数

基于Impala平台打造交互查询系统

本文来自网易云社区 原创: 蒋鸿翔 DataFunTalk 本文根据网易大数据蒋鸿翔老师DataFun Talk--"大数据从底层处理到数据驱动业务"中分享的<基于Impala平台打造交互查询系统>编辑整理而成,在未改变原意的基础上稍做整理. 以上是今天的内容大纲,第一个讲一下交互式查询的特点,在大数据平台有很多查询平台可以选择,第二个讲一下依据项目如何选择平台,选型因素是什么.第三个讲一下Impala基本介绍,以及在Impala上的改进.接下来是impala的应用场景,最

基于HBase的手机数据备份系统

洞穴逃生 描述: 精灵王子爱好冒险,在一次探险历程中,他进入了一个神秘的山洞.在洞穴深处,精灵王子不小心触动了洞穴内暗藏的机关,整个洞穴将很快塌陷,精灵王子必须尽快逃离洞穴.精灵王子的跑步速度为17m/s,以这样的速度可能是无法逃出洞穴的.庆幸的是精灵王子拥有闪烁法术,可在1s内移动60m,不过每次使用闪烁法术都会消耗魔法值10点.精灵王子的魔法值恢复的速度为4点/s,只有处在原地休息状态时才能恢复. 现在已知精灵王子的魔法初值M,他所在洞穴中的位置与洞穴出口之间的距离S,距离洞穴塌陷的时间T.

基于java最短路径算法公交查询系统的设计与实现

基于J2EE的公交查询系统的设计与实现 1引言 1.1 选题背景 20多年来,我国经济得到了持续.快速.稳定.健康地发展.经济的快速增长,带动了汽车工业的蓬勃发展,并使交通状况显著改善.据统计,中国公路通车总里程已达130余万公里,其中高速公路约1.5万公里.居民收入普遍提高,到2000年年底,人均GDP已超过800美元,沿海地区已达2000-3000美元.按国际发展惯例,当人均GDP超出1000美元,汽车消费市场就将进入快速增长期.我国城市人口约有2亿,略低于美国人口.东部沿海地区大部分居民已

基于二叉排序树的高校分数查询系统

前述:该学期最后的数据结构的课程设计选题,于是记录在自己博客中,作为自己技术成长的点滴吧. 题目:高校最低录取分数线的查询 编程实现一个开放式的高校本科招生最低分数线的查询系统,供师生及家长等查询,高校自愿放入该校的信息,可能随时有高校加入. 要求实现的查询功能有: 查询等于用户给定分数的高校 查询大于(或小于)用户给定分数的高校 查询最低录取分数线的用户给定的分数段中的高校   以上就是老师给定题目中要求实现的功能,为了是程序整体更加完整,自己对高校使用部分做了一些功能添加,例如学校信息的增删

HBase高性能复杂条件查询引擎

--索引的实质是另一种编排形式的数据冗余,高效的检索源自于面向查询特别设计的编排形式,如果再辅以分布式的计算框架,就可以支撑起高性能的大数据查询.本文原文出处: http://blog.csdn.net/bluishglc/article/details/31799255 严禁任何形式的转载,否则将委托CSDN官方维护权益! Apache HBase?是一个分布式.可伸缩的NoSQL数据库,它构建在Hadoop基础设施之上,依托于Hadoop的迅猛发展,HBase在大数据领域的应用越来越广泛,成

OpenTSDB介绍——基于Hbase的分布式的,可伸缩的时间序列数据库,而Hbase本质是列存储

原文链接:http://www.jianshu.com/p/0bafd0168647 OpenTSDB介绍 1.1.OpenTSDB是什么?主要用途是什么? 官方文档这样描述:OpenTSDB is a distributed, scalable Time Series Database (TSDB) written on top of HBase: 翻译过来就是,基于Hbase的分布式的,可伸缩的时间序列数据库. 主要用途,就是做监控系统:譬如收集大规模集群(包括网络设备.操作系统.应用程序)

一种基于HBase韵海量图片存储技术

针对海量图片存储,已有若干个基于Hadoop的方案被设计出来.这些方案在系统层小文件合并.全局名字空间以及通用性方面存在不足.本文基于HBase提出了一种海量图片存储技术,成功解决了上述问题.本文将介绍基于HBase海量图片存储技术方案,分析其原理及优势,该方案在城市交通监控中得到应用验证. 随着互联网.云计算及大数据等信息技术的发展,越来越多的应用依赖于对海量数据的存储和处理,如智能监控.电子商务.地理信息等,这些应用都需要对海量图片的存储和检索.由于图片大多是小文件(80%大小在数MB以内)

基于HBASE的并行计算架构之rowkey设计篇

1.大数据在HBASE存储.计算以及查询的应用场景 海量数据都是事务数据,事务数据都是在时间的基础上产生的.数据的业务时间可能会顺序产生,也可能不会顺序产生,比如某些事务发生在早上10点,但是在下午5点才结束闭并生成出来,这样的数据就会造成存储加载时的时间连续性.另外海量数据的挖掘后产生的是统计数据,统计数据也有时间属性,统计数据如果进行保存必须保证在统计计算之后数据尽量不再变化,如果统计发生后又有新的事务数据产生,那么将重新触发统计计算然后重新保存覆盖原有已经存储的数据.其它数据则主要是以配置