实战:在Java Web 项目中使用HBase

在此之前我们使用Mysql作为数据源,但发现这数据增长速度太快,并且由于种种原因,因此必须使用HBase,所以我们要把Mysql表里面的数据迁移到HBase中,在这里我就不讲解、不争论为什么要使用HBase,HBase是什么了,喜欢的就认真看下去,总有些地方是有用的

我们要做的3大步骤:

  1. 新建HBase表格。
  2. 把MYSQL数据迁移到HBase中。
  3. 在Java Web项目中读取HBase的数据。

先介绍一下必要的一些环境:

HBase的版本:0.98.8-hadoop2

所需的依赖包

commons-codec-1.7.jar
commons-collections-3.2.1.jar
commons-configuration-1.6.jar
commons-lang-2.6.jar
commons-logging-1.1.3.jar
guava-12.0.1.jar
hadoop-auth-2.5.0.jar
hadoop-common-2.5.0.jar
hbase-client-0.98.8-hadoop2.jar
hbase-common-0.98.8-hadoop2.jar
hbase-protocol-0.98.8-hadoop2.jar
htrace-core-2.04.jar
jackson-core-asl-1.9.13.jar
jackson-mapper-asl-1.9.13.jar
log4j-1.2.17.jar
mysql-connector-java-5.1.7-bin.jar
netty-3.6.6.Final.jar
protobuf-java-2.5.0.jar
slf4j-api-1.7.5.jar
slf4j-log4j12-1.7.5.jar
zookeeper-3.4.6.jar

如果在你的web项目中有些包已经存在,保留其中一个就好了,免得报奇怪的错误就麻烦了。

步骤1:建表

在此之前,我在Mysql中的业务数据表一共有6个,其结构重复性太高了,首先看看我在HBase里面的表结构:

表名 kpi
key fid+tid+date
簇(family) base gpower userate consum time
描述 基础信息 发电量相关指标 可利用率 自耗电量 累计运行小时数 检修小时数 利用小时数
列(qualifier) fid tid date power windspeed unpower theory coup time power num cpower gpower runtime checktime usetime
描述 风场ID 风机号 日期 发电量 风速 弃风电量 理论电量 耦合度 故障时间 故障损失电量 故障台次 当天自耗电量 当天发电量 当天并网秒数 当天检修秒数 当天利用秒数

这个表中我们有5个family,其中base Family是对应6个mysql表中的key列, gpower、userate、consum分别对应一个表,time对应3个表。

这个kpi表的rowkey设计是base中的3个qualifier,分别从3个维度查询数据,这样的设计已经可以满足我们的需求了。

具体在HBase中如何建表如何搭建环境自己参考我写的【手把手教你配置HBase完全分布式环境】这篇文章吧。

步骤2:把MySQL数据迁移到HBase


这时我用gpower对应的mysql表来做演示吧,其他表的道理都一样。(这里可能有人会说为什么不用第三方插件直接数据库对数据库迁移,这里我统一回答一下,我不会,我也不需要。)

okay,首先我们来看看代码先吧:

import java.io.File;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.BasicConfigurator;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

public class GpowerTransfer{

	private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";//这里是你HBase的分布式集群结点,用逗号分开。
	private static final String CLIENT_PORT = "2181";//端口
	private static Logger log = Logger.getLogger(GpowerTransfer.class);

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		BasicConfigurator.configure();
		log.setLevel(Level.DEBUG);
		String tableName = "kpi";//HBase表名称
		Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", QUOREM);
        conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);
        try { File workaround = new File(".");
            System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());
            new File("./bin").mkdirs();
            new File("./bin/winutils.exe").createNewFile();//这几段奇怪的代码在windows跑的时候不加有时候分报错,在web项目中可以不要,但单独的java程序还是加上去吧,知道什么原因的小伙伴可以告诉我一下,不胜感激。
			HBaseAdmin admin = new HBaseAdmin(conf);
			if(admin.tableExists(tableName)){
				Class.forName("com.mysql.jdbc.Driver");//首先将mysql中的数据读取出来,然后再插入到HBase中
				String url = "jdbc:mysql://192.168.***.***:3306/midb?useUnicode=true&characterEncoding=utf-8";
				String username = "********";
				String password = "********";
				Connection con = DriverManager.getConnection(url, username, password);
				PreparedStatement pstmt = con.prepareStatement("select * from kpi_gpower");
				ResultSet rs = pstmt.executeQuery();
				HTable table = new HTable(conf, tableName);
				log.debug(tableName + ":start copying data to hbase...");
				List<Put> list = new ArrayList<Put>();
				SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
				String base = "base";//family名称
				String gpower = "gpower";//family名称
				String[] qbase = {"fid","tid","date"};//qualifier名称
				String[] qgpower = {"power","windspeed","unpower","theory","coup"};//qualifier名称
				while(rs.next()){
				    String rowKey = rs.getString("farmid") + ":" + (rs.getInt("turbineid")<10?("0"+rs.getInt("turbineid")):rs.getInt("turbineid")) + ":" + sdf.format(rs.getDate("vtime"));//拼接rowkey
				    Put put = new Put(Bytes.toBytes(rowKey));//新建一条记录,然后下面对相应的列进行赋值
				    put.add(base.getBytes(), qbase[0].getBytes(), Bytes.toBytes(rs.getString("farmid")));//base:fid
				    put.add(base.getBytes(), qbase[1].getBytes(), Bytes.toBytes(rs.getInt("turbineid")+""));//base:tid
				    put.add(base.getBytes(), qbase[2].getBytes(), Bytes.toBytes(rs.getDate("vtime")+""));//base:date
				    put.add(gpower.getBytes(), qgpower[0].getBytes(), Bytes.toBytes(rs.getFloat("value")+""));//gpower:power
				    put.add(gpower.getBytes(), qgpower[1].getBytes(), Bytes.toBytes(rs.getFloat("windspeed")+""));//gpower:windspeed
				    put.add(gpower.getBytes(), qgpower[2].getBytes(), Bytes.toBytes(rs.getFloat("unvalue")+""));//gpower:unvalue
				    put.add(gpower.getBytes(), qgpower[3].getBytes(), Bytes.toBytes(rs.getFloat("theory")+""));//gpower:theory
				    put.add(gpower.getBytes(), qgpower[4].getBytes(), Bytes.toBytes(rs.getFloat("coup")+""));//gpower:coup
				    list.add(put);
				}
				table.put(list);//这里真正对表进行插入操作
				log.debug(tableName + ":completed data copy!");
				table.close();//这里要非常注意一点,如果你频繁地对表进行打开跟关闭,性能将会直线下降,可能跟集群有关系。
			}else{
				admin.close();
				log.error("table ‘" + tableName + "‘ not exisit!");
				throw new IllegalArgumentException("table ‘" + tableName + "‘ not exisit!");
			}
			admin.close();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

在put语句进行add的时候要特别注意:对于int、float、Date等等非String类型的数据,要记得将其转换成String类型,这里我直接用+""解决了,否则在你读取数据的时候就会遇到麻烦了。

步骤3:Java Web项目读取HBase里面的数据

ok,我们成功地把数据迁移到HBase,我们剩下的任务就是在Web应用中读取数据了。

首先我们要确保Web项目中已经把必要的Jar包添加到ClassPath了,下面我对一些HBase的连接做了小封装:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;

/**
 * @author a01513
 *
 */
public class HBaseConnector {

	private static final String QUOREM = "192.168.103.50,192.168.103.51,192.168.103.52,192.168.103.53,192.168.103.54,192.168.103.55,192.168.103.56,192.168.103.57,192.168.103.58,192.168.103.59,192.168.103.60";
	private static final String CLIENT_PORT = "2181";
	private HBaseAdmin admin;
	private Configuration conf;

	public HBaseAdmin getHBaseAdmin(){
		getConfiguration();
        try {
			admin = new HBaseAdmin(conf);
		} catch (Exception e) {
			e.printStackTrace();
		}
		return admin;
	}

	public Configuration getConfiguration(){
		if(conf == null){
			conf = HBaseConfiguration.create();
	        conf.set("hbase.zookeeper.quorum", QUOREM);
	        conf.set("hbase.zookeeper.property.clientPort", CLIENT_PORT);
		}
		return conf;
	}

这里的代码基本上跟迁移的那部分代码一样,由于我在其他地方都要重用这些代码,就装在一个地方免得重复写了。

我在Service层做了一下测试,下面看看具体的读取过程:

private final String tableName = "kpi";
@Override
	public List<GenPowerEntity> getGenPower(String farmid,int ltb,int htb,String start,String end) {
		List<GenPowerEntity> list = new ArrayList<GenPowerEntity>();
		HBaseConnector hbaseConn = new HBaseConnector();
		HBaseAdmin admin = hbaseConn.getHBaseAdmin();
		try {
			if(admin.tableExists(tableName)){
				HTable table = new HTable(hbaseConn.getConfiguration(), tableName);
				Scan scan = new Scan();
				scan.addFamily(Bytes.toBytes("base"));
				scan.addFamily(Bytes.toBytes("gpower"));
				scan.addFamily(Bytes.toBytes("userate"));
				String startRowKey = new String();
				String stopRowKey = new String();
				if("".equals(start) && !"".equals(end)){
					stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;
					scan.setStopRow(Bytes.toBytes(stopRowKey));
				}else if(!"".equals(start) && "".equals(end)){
					startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;
					scan.setStartRow(Bytes.toBytes(startRowKey));
				}else if(!"".equals(start) && !"".equals(end)){
					startRowKey = farmid + ":" + Tools.addZero(ltb) + ":" + start;
					stopRowKey = farmid + ":" + Tools.addZero(htb) + ":" + end;
					scan.setStartRow(Bytes.toBytes(startRowKey));
					scan.setStopRow(Bytes.toBytes(stopRowKey));
				}else{
					table.close();
					admin.close();
					return null;
				}
				ResultScanner rsc = table.getScanner(scan);
				Iterator<Result> it = rsc.iterator();
				List<GenPowerEntity> slist = new ArrayList<GenPowerEntity>();
				List<UseRateEntity> ulist = new ArrayList<UseRateEntity>();
				String tempRowKey = "";//这个临时rowkey是用来判断一行数据是否已经读取完了的。
				GenPowerEntity gpower = new GenPowerEntity();
				UseRateEntity userate = new UseRateEntity();
				while(it.hasNext()){
					for(Cell cell: it.next().rawCells()){
						String rowKey = new String(cell.getRowArray(),cell.getRowOffset(),cell.getRowLength(),"UTF-8");
						String family = new String(cell.getFamilyArray(),cell.getFamilyOffset(),cell.getFamilyLength(),"UTF-8");
						String qualifier = new String(cell.getQualifierArray(),cell.getQualifierOffset(),cell.getQualifierLength(),"UTF-8");
						String value = new String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength(),"UTF-8");//假如我们当时插入HBase的时候没有把int、float等类型的数据转换成String,这里就会乱码了,并且用Bytes.toInt()这个方法还原也没有用,哈哈
						System.out.println("RowKey=>"+rowKey+"->"+family+":"+qualifier+"="+value);
						if("".equals(tempRowKey))
							tempRowKey = rowKey;
						if(!rowKey.equals(tempRowKey)){
							slist.add(gpower);
							ulist.add(userate);
							gpower = null;
							userate = null;
							gpower = new GenPowerEntity();
							userate = new UseRateEntity();
							tempRowKey = rowKey;
						}
						switch(family){
						case "base":
							switch(qualifier){
							case "fid":
								gpower.setFarmid(value);
								userate.setFarmid(value);
								break;
							case "tid":
								gpower.setTurbineid(Integer.parseInt(value));
								userate.setTurbineid(Integer.parseInt(value));
								break;
							case "date":
								SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
								Date date = null;
								try {
									date = sdf.parse(value);
								} catch (ParseException e) {
									e.printStackTrace();
								}
								gpower.setVtime(date);
								userate.setVtime(date);
								break;
							}
							break;
						case "gpower":
							switch(qualifier){
							case "power":
								gpower.setValue(Float.parseFloat(value));
								break;
							case "windspeed":
								gpower.setWindspeed(Float.parseFloat(value));
								break;
							case "unpower":
								gpower.setUnvalue(Float.parseFloat(value));
								break;
							case "theory":
								gpower.setTvalue(Float.parseFloat(value));
								break;
							case "coup":
								gpower.setCoup(Float.parseFloat(value));
								break;
							}
							break;
						case "userate":
							switch(qualifier){
							case "num":
								userate.setFnum(Integer.parseInt(value));
								break;
							case "power":
								userate.setFpower(Float.parseFloat(value));
								break;
							case "time":
								userate.setFvalue(Float.parseFloat(value));
								break;
							}
							break;
						}

					}
				}
				rsc.close();
				table.close();
				admin.close();
				......
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		return list;
	}

这是我在Service层中用作测试的一个方法,业务逻辑代码可以直接无视(已经用.....代替了,哈哈),至此我们的所有工作完成,对于更深入的应用,还要靠自己去认真挖掘学习了。

时间: 2024-11-07 21:15:31

实战:在Java Web 项目中使用HBase的相关文章

在java web项目中集成webservice

公司要求在项目中加入webservice服务,因为项目中使用了spring框架,所以在这里使用与spring兼容性较好的cxf来实现 cxf所需jar包 spring的jar包就不贴了 一:创建webservice服务器 1)创建一个服务接口 package com.service; import javax.jws.WebParam; import javax.jws.WebService; @WebService public interface IHelloWorld { public S

Java Web项目中缺少Java EE 6 Libraries怎么添加

具体步骤如下: 1.项目名称上点击鼠标右键,选择"Build Path-->Configure Build Path-->Java Build Path" 2.单击"Add Library..." 3.选择"User Library",单击"Next" 4.单击"User Libraries...",选择需要的Java EE 6 Libraries Java Web项目中缺少Java EE 6

JAVA WEB项目中各种路径的获取

JAVA WEB项目中各种路径的获取 标签: java webpath文件路径 2014-02-14 15:04 1746人阅读 评论(0) 收藏 举报  分类: JAVA开发(41)  1.可以在servlet的init方法里 String path = getServletContext().getRealPath("/"); 这将获取web项目的全路径 例如 :E:\eclipseM9\workspace\tree\ tree是我web项目的根目录 2.你也可以随时在任意的cla

在Java Web项目中引入Mondrian多维分析框架

一,Mondrian简介 Mondrian是一个开源项目,一个用Java写成的OLAP引擎.它用MDX语言实现查询,从关系数据库(RDBMS)中读取数据.然后经过Java API以多维的方式对结果进行展示. Mondrian的使用方式同JDBC驱动类似.可以非常方便的与现有的Web项目集成. Mondrian OLAP 系统由四个层组成,可分为从最终用户到数据中心, 顺序为: 1表现层(the presentation layer) 2维度层(the dimensional layer) 3集合

Java web 项目中使用ckeditor和ckfinder

重点汇总: 加入相关的jar包 在web.xml文件加入ckfinder 请求拦截器 配置ckfinder的配置文件:config.xml 杂谈 作为一个可视化的HTML 编辑器,最重要的一点是对于文件.图片和视频的管理,一个好的可视化的HTML 编辑器应该有很好的设计. 其他的话就不多说了,现在开始进入正题. ckeditor和ckfinder 两个到底是干什么的?为什么要做两个? ckeditor就是一个可视化的HTML编辑器,但是他的上传图片和视频却交给了另外的软件:ckfinder,为什

Java Web项目中连接Access数据库的配置方法

本文是对前几天的"JDBC连接Access数据库的几种方式"这篇的升级.因为在做一些小项目的时候遇到的问题,因此才决定写这篇博客的.昨天已经将博客公布了.可是后来经过一些验证有点问题,所以今天改了一下又一次的公布了 老师决定期末考试採用access数据库实现增删改查.我觉得如今的我已经没有问题了.可是曾经都是在JSP页面中连接access数据库,不管是下面的那种方式都进行了连接的练习,可是如今我想让我的项目中的訪问access数据库的java代码,封装到DAO中,在DAO中连接数据库,

java web项目中 读取properties 路径的问题

可以先获取项目的classPath String classPath = this.getClass().getResource("/").getPath();//获取classPath(部署到tomcat的路径上) 我的为/D:/apache-tomcat-6.0.29/webapps/demo/WEB-INF/classes/  在连接下面的路径即可 代码如下: package readproperties; import java.io.BufferedInputStream;i

理解java Web项目中的路径问题

本文以项目部署在tomcat服务器为例,其他相信也是一样的. 先说明请求页面的写法,在web中,页面路径主要写的有以下几种 1.请求重定向 2.浏览器的请求被服务器请求到新页面(我称为“转发”) 3.超链接 4.form表单提交的action 为了演示路径写法,首先先建一个项目(项目名WebPath),并建立一个servlet(PathServlet) 目录结构如下 以访问目录中index.jsp文件为例,jxf.path.PathServlet.jave中对以上四种路径的写法(红色部分) 1

Java web项目中java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

原来是tomcat找不到MYSQL JAR包的问题.后来又把mysql-connector-java-5.1.7-bin.jar导入到tomcat的lib目录下面就ok了,嘿…… 在java项目中,只需要引入mysql-connector-java-5.1.7-bin.jar就可以运行java项目. 在web项目中,当Class.forName("om.mysql.jdbc.Driver");时myeclipse是不会去查找字符串,不会去查找驱动的.所以只需要把mysql-connec