一、方式介绍
本次测试一种采用了四种方式进行了对比,分别是:1.在RDD内部调用java API。2、调用saveAsNewAPIHadoopDataset()接口。3、saveAsHadoopDataset()。4、BulkLoad方法。
测试使用的大数据版本如下(均为单机版):Hadoop2.7.4、Hbase1.0.2、Spark2.1.0
二、测试(BulkLoad暂未测试)
本次测试采用10W条单一列簇单一字段固定值进行测试。
以下是测试结果:
1.JAVA API
10W条数据:1000ms、944ms
100w条数据:6308ms、6725ms
2.saveAsNewAPIHadoopDataset()接口
10W条数据:2585ms、3125ms
100w条数据:13833ms、14880ms
3.saveAsHadoopDataset()接口
10W条数据:2623ms、2596ms
100w条数据:14929ms、13753ms
4.BulkLoad方法(暂未测试)
三、代码
pom引用
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.2.6</version></dependency><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.0.2</version></dependency><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.0.2</version></dependency><dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> <version>1.0.2</version></dependency> 1)javaAPI代码-------------------------------------
package cn.piesat.app import java.text.DecimalFormatimport java.util.{ArrayList, List, Random} import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}import org.apache.hadoop.hbase.client._ object SparkJavaApi { val ZOOKEEPER_ADDRESS = "hadoop01" val ZOOKEEPER_PORT = "2181" val df2: DecimalFormat = new DecimalFormat("00") def main(args: Array[String]) = { val tableName: String = "test01" val conn = getConn val admin = conn.getAdmin val putList = getPutList() if (!admin.tableExists(TableName.valueOf(tableName))) { createTable(admin, tableName, Array("cf")) } val start: Long = System.currentTimeMillis insertBatchData(conn,tableName,admin,putList) val end: Long = System.currentTimeMillis System.out.println("用时:" + (end - start)) } def getConn(): Connection = { val conf = HBaseConfiguration.create conf.set("hbase.zookeeper.quorum", ZOOKEEPER_ADDRESS) conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT) ConnectionFactory.createConnection(conf) } def insertBatchData(conn: Connection, tableName: String, admin: Admin, puts:List[Put]) = try { val tableNameObj = TableName.valueOf(tableName) if (admin.tableExists(tableNameObj)) { val table = conn.getTable(tableNameObj) table.put(puts) table.close() admin.close() } } catch { case e: Exception => e.printStackTrace() } def createTable(admin: Admin, tableName: String, colFamiles: Array[String]) = try { val tableNameObj = TableName.valueOf(tableName) if (!admin.tableExists(TableName.valueOf(tableName))) { val desc = new HTableDescriptor(tableNameObj) for (colFamily <- colFamiles) { desc.addFamily(new HColumnDescriptor(colFamily)) } admin.createTable(desc) admin.close() } } catch { case e: Exception => e.printStackTrace() } def getPutList(): List[Put] = { val random: Random = new Random val putlist = new ArrayList[Put](); for (i <- 0 until 100000) { val rowkey: String = df2.format(random.nextInt(99)) + i val put: Put = new Put(rowkey.getBytes) put.add("cf".getBytes, "field".getBytes, "a".getBytes) putlist.add(put) } putlist } }
------------------------------------- 2)saveAsNewAPIHadoopDataset()接口-------------------------------------
package cn.piesat.app import java.text.DecimalFormat import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableOutputFormatimport org.apache.hadoop.hbase._import org.apache.hadoop.mapred.JobConfimport org.apache.hadoop.mapreduce.Jobimport org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBuffer //10W用了2585ms//100W用了13833ms、14880msobject SparkToHbaseNewAPI { val tableName = "test01" val cf = "cf" val num=1000000 val df2 = new DecimalFormat("00000000") def main(args: Array[String]) = { val sc = getSparkSession().sparkContext val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hadoop01:2181") val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val admin = hbaseConn.getAdmin val jobConf = new JobConf(hbaseConf, this.getClass) // 设置表名 jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) // 如果表不存在则创建表 if (!admin.tableExists(TableName.valueOf(tableName))) { val desc = new HTableDescriptor(TableName.valueOf(tableName)) val hcd = new HColumnDescriptor(cf) desc.addFamily(hcd) admin.createTable(desc) } val job = Job.getInstance(jobConf) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Put]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) var list = ListBuffer[Put]() println("数据准备中。。。。") for (i <- 0 to num) { val put = new Put(df2.format(i).getBytes()) put.addColumn(cf.getBytes(), "field".getBytes(), "abc".getBytes()) list.append(put) } println("数据准备完成!") val data = sc.makeRDD(list.toList).map(x => { (new ImmutableBytesWritable, x) }) val start = System.currentTimeMillis() data.saveAsNewAPIHadoopDataset(job.getConfiguration) val end = System.currentTimeMillis() println("入库用时:" + (end - start)) sc.stop() } def getSparkSession(): SparkSession = { SparkSession.builder(). appName("SparkToHbase"). master("local[4]"). config("spark.serializer", "org.apache.spark.serializer.KryoSerializer"). getOrCreate() }}
-------------------------------------
3)saveAsHadoopDataset()接口
-------------------------------------
package cn.piesat.appimport java.text.DecimalFormat import org.apache.hadoop.hbase._import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapred.TableOutputFormatimport org.apache.hadoop.mapred.JobConfimport org.apache.spark.sql.SparkSession import scala.collection.mutable.ListBufferobject SparkToHbaseOldAPI { val tableName="test01" val cf="cf" val df2 = new DecimalFormat("00000000") val num=1000000 //10W用时2623ms、2596ms //100W用时14929ms、13753ms def main(args: Array[String]): Unit = { val sc = getSparkSession().sparkContext val hbaseConf = HBaseConfiguration.create() hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hadoop01:2181") val hbaseConn = ConnectionFactory.createConnection(hbaseConf) val admin = hbaseConn.getAdmin val jobConf = new JobConf(hbaseConf, this.getClass) // 设置表名 jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName) jobConf.setOutputFormat(classOf[TableOutputFormat]) // 如果表不存在则创建表 if (!admin.tableExists(TableName.valueOf(tableName))) { val desc = new HTableDescriptor(TableName.valueOf(tableName)) val hcd = new HColumnDescriptor(cf) desc.addFamily(hcd) admin.createTable(desc) } var list = ListBuffer[Put]() println("数据准备中。。。。") for (i <- 0 to num) { val put = new Put(df2.format(i).getBytes()) put.addColumn(cf.getBytes(), "field".getBytes(), "abc".getBytes()) list.append(put) } println("数据准备完成!") val data = sc.makeRDD(list.toList).map(x => { (new ImmutableBytesWritable, x) }) val start=System.currentTimeMillis() data.saveAsHadoopDataset(jobConf) val end=System.currentTimeMillis() println("入库用时:"+(end-start)) sc.stop() } def getSparkSession(): SparkSession = { SparkSession.builder(). appName("SparkToHbase"). master("local[4]"). config("spark.serializer", "org.apache.spark.serializer.KryoSerializer"). getOrCreate() }}
-------------------------------------4)BulkLoad方法(暂未测试)------------------------------------ ------------------------------------
原文地址:https://www.cnblogs.com/runnerjack/p/10480468.html
时间: 2024-10-01 11:40:22