Spark入Hbase的四种方式效率对比

一、方式介绍

本次测试一种采用了四种方式进行了对比,分别是:1.在RDD内部调用java API。2、调用saveAsNewAPIHadoopDataset()接口。3、saveAsHadoopDataset()。4、BulkLoad方法。

测试使用的大数据版本如下(均为单机版):Hadoop2.7.4、Hbase1.0.2、Spark2.1.0

二、测试(BulkLoad暂未测试)

本次测试采用10W条单一列簇单一字段固定值进行测试。

以下是测试结果:

1.JAVA API

  10W条数据:1000ms、944ms

  100w条数据:6308ms、6725ms

2.saveAsNewAPIHadoopDataset()接口

  10W条数据:2585ms、3125ms

  100w条数据:13833ms、14880ms

3.saveAsHadoopDataset()接口

10W条数据:2623ms、2596ms

  100w条数据:14929ms、13753ms

4.BulkLoad方法(暂未测试)

三、代码

pom引用

<dependency>    <groupId>org.apache.hbase</groupId>    <artifactId>hbase</artifactId>    <version>1.2.6</version></dependency><dependency>    <groupId>org.apache.hbase</groupId>    <artifactId>hbase-client</artifactId>    <version>1.0.2</version></dependency><dependency>    <groupId>org.apache.hbase</groupId>    <artifactId>hbase-server</artifactId>    <version>1.0.2</version></dependency><dependency>    <groupId>org.apache.hbase</groupId>    <artifactId>hbase-common</artifactId>    <version>1.0.2</version></dependency>

1)javaAPI代码-------------------------------------
package cn.piesat.app

import java.text.DecimalFormatimport java.util.{ArrayList, List, Random}

import org.apache.hadoop.hbase.{HBaseConfiguration, HColumnDescriptor, HTableDescriptor, TableName}import org.apache.hadoop.hbase.client._

object SparkJavaApi {  val ZOOKEEPER_ADDRESS = "hadoop01"  val ZOOKEEPER_PORT = "2181"  val df2: DecimalFormat = new DecimalFormat("00")

  def main(args: Array[String]) = {    val tableName: String = "test01"    val conn = getConn    val admin = conn.getAdmin    val putList = getPutList()    if (!admin.tableExists(TableName.valueOf(tableName))) {      createTable(admin, tableName, Array("cf"))    }    val start: Long = System.currentTimeMillis    insertBatchData(conn,tableName,admin,putList)    val end: Long = System.currentTimeMillis    System.out.println("用时:" + (end - start))  }

  def getConn(): Connection = {    val conf = HBaseConfiguration.create    conf.set("hbase.zookeeper.quorum", ZOOKEEPER_ADDRESS)    conf.set("hbase.zookeeper.property.clientPort", ZOOKEEPER_PORT)    ConnectionFactory.createConnection(conf)  }

  def insertBatchData(conn: Connection, tableName: String, admin: Admin, puts:List[Put]) = try {    val tableNameObj = TableName.valueOf(tableName)    if (admin.tableExists(tableNameObj)) {      val table = conn.getTable(tableNameObj)      table.put(puts)      table.close()      admin.close()    }  } catch {    case e: Exception =>      e.printStackTrace()  }

  def createTable(admin: Admin, tableName: String, colFamiles: Array[String]) = try {    val tableNameObj = TableName.valueOf(tableName)    if (!admin.tableExists(TableName.valueOf(tableName))) {      val desc = new HTableDescriptor(tableNameObj)      for (colFamily <- colFamiles) {        desc.addFamily(new HColumnDescriptor(colFamily))      }      admin.createTable(desc)      admin.close()    }  } catch {    case e: Exception =>      e.printStackTrace()  }

  def getPutList(): List[Put] = {    val random: Random = new Random    val putlist = new ArrayList[Put]();    for (i <- 0 until 100000) {      val rowkey: String = df2.format(random.nextInt(99)) + i      val put: Put = new Put(rowkey.getBytes)      put.add("cf".getBytes, "field".getBytes, "a".getBytes)      putlist.add(put)    }    putlist  }

}
-------------------------------------

2)saveAsNewAPIHadoopDataset()接口-------------------------------------
package cn.piesat.app

import java.text.DecimalFormat

import org.apache.hadoop.hbase.client._import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapreduce.TableOutputFormatimport org.apache.hadoop.hbase._import org.apache.hadoop.mapred.JobConfimport org.apache.hadoop.mapreduce.Jobimport org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBuffer

//10W用了2585ms//100W用了13833ms、14880msobject SparkToHbaseNewAPI {  val tableName = "test01"  val cf = "cf"  val num=1000000  val df2 = new DecimalFormat("00000000")  def main(args: Array[String]) = {    val sc = getSparkSession().sparkContext    val hbaseConf = HBaseConfiguration.create()    hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hadoop01:2181")    val hbaseConn = ConnectionFactory.createConnection(hbaseConf)    val admin = hbaseConn.getAdmin    val jobConf = new JobConf(hbaseConf, this.getClass)    // 设置表名    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)

    // 如果表不存在则创建表    if (!admin.tableExists(TableName.valueOf(tableName))) {      val desc = new HTableDescriptor(TableName.valueOf(tableName))      val hcd = new HColumnDescriptor(cf)      desc.addFamily(hcd)      admin.createTable(desc)    }

    val job = Job.getInstance(jobConf)    job.setOutputKeyClass(classOf[ImmutableBytesWritable])    job.setOutputValueClass(classOf[Put])    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])    var list = ListBuffer[Put]()    println("数据准备中。。。。")    for (i <- 0 to num) {      val put = new Put(df2.format(i).getBytes())      put.addColumn(cf.getBytes(), "field".getBytes(), "abc".getBytes())      list.append(put)    }    println("数据准备完成!")    val data = sc.makeRDD(list.toList).map(x => {      (new ImmutableBytesWritable, x)    })    val start = System.currentTimeMillis()

    data.saveAsNewAPIHadoopDataset(job.getConfiguration)    val end = System.currentTimeMillis()    println("入库用时:" + (end - start))    sc.stop()

  }

  def getSparkSession(): SparkSession = {    SparkSession.builder().      appName("SparkToHbase").      master("local[4]").      config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").      getOrCreate()  }}
-------------------------------------
3)saveAsHadoopDataset()接口
-------------------------------------
package cn.piesat.appimport java.text.DecimalFormat

import org.apache.hadoop.hbase._import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.hadoop.hbase.mapred.TableOutputFormatimport org.apache.hadoop.mapred.JobConfimport org.apache.spark.sql.SparkSession

import scala.collection.mutable.ListBufferobject SparkToHbaseOldAPI {  val tableName="test01"  val cf="cf"  val df2 = new DecimalFormat("00000000")  val num=1000000  //10W用时2623ms、2596ms  //100W用时14929ms、13753ms  def main(args: Array[String]): Unit = {    val sc = getSparkSession().sparkContext    val hbaseConf = HBaseConfiguration.create()    hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "hadoop01:2181")    val hbaseConn = ConnectionFactory.createConnection(hbaseConf)    val admin = hbaseConn.getAdmin    val jobConf = new JobConf(hbaseConf, this.getClass)    // 设置表名    jobConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)    jobConf.setOutputFormat(classOf[TableOutputFormat])

    // 如果表不存在则创建表    if (!admin.tableExists(TableName.valueOf(tableName))) {      val desc = new HTableDescriptor(TableName.valueOf(tableName))      val hcd = new HColumnDescriptor(cf)      desc.addFamily(hcd)      admin.createTable(desc)    }

    var list = ListBuffer[Put]()    println("数据准备中。。。。")    for (i <- 0 to num) {      val put = new Put(df2.format(i).getBytes())      put.addColumn(cf.getBytes(), "field".getBytes(), "abc".getBytes())      list.append(put)    }    println("数据准备完成!")    val data = sc.makeRDD(list.toList).map(x => {      (new ImmutableBytesWritable, x)    })    val start=System.currentTimeMillis()    data.saveAsHadoopDataset(jobConf)    val end=System.currentTimeMillis()    println("入库用时:"+(end-start))    sc.stop()  }

  def getSparkSession(): SparkSession = {    SparkSession.builder().      appName("SparkToHbase").      master("local[4]").      config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").      getOrCreate()  }}
-------------------------------------4)BulkLoad方法(暂未测试)------------------------------------

------------------------------------

原文地址:https://www.cnblogs.com/runnerjack/p/10480468.html

时间: 2024-10-01 11:40:22

Spark入Hbase的四种方式效率对比的相关文章

Android中多线程的使用四种方式最全总结

当我们启动一个App的时候,Android系统会启动一个Linux Process,该Process包含一个Thread,称为UI Thread或Main Thread.通常一个应用的所有组件都运行在这一个Process中,当然,你可以通过修改四大组件在Manifest.xml中的代码块(<activity><service><provider><receiver>)中的android:process属性指定其运行在不同的process中.当一个组件在启动的

jQuery绑定事件的四种方式:bind、live、delegate、on

1.jQuery操作DOM元素的绑定事件的四种方式 jQuery中提供了四种事件监听方式,分别是bind.live.delegate.on,对应的解除监听的函数分别是unbind.die.undelegate.off. 2.必备的基础知识: DOM树 示例,这是在browser环境下的一棵模拟DOM树: 我们的页面可以理解为一棵DOM树,当我们在叶子结点上做什么事情的时候(如click一个a元素),如果我们没有人为的设置stopPropagation(Moder Browser), cancel

jQuery绑定事件的四种方式

jQuery绑定事件的四种方式 jQuery提供了多种绑定事件的方式,每种方式各有其特点,明白了它们之间的异同点,有助于我们在写代码的时候进行正确的选择,从而写出优雅而容易维护的代码.下面我们来看下jQuery中绑定事件的方式都有哪些. jQuery中提供了四种事件监听方式,分别是bind.live.delegate.on,对应的解除监听的函数分别是unbind.die.undelegate.off.在开始看他们之前 一:bind(type,[data],function(eventObject

【Java EE 学习第80天】【调用WebService服务的四种方式】

不考虑第三方框架,如果只使用JDK提供的API,那么可以使用三种方式调用WebService服务:另外还可以使用Ajax调用WebService服务. 预备工作:开启WebService服务,使用jdk命令wsimport生成调用源代码 package com.kdyzm.ws; import javax.jws.WebService; import javax.xml.ws.Endpoint; @WebService public class MyWsServer { public Strin

VirtualBox虚拟机网络设置(四种方式)(转)

VirtualBox虚拟机网络设置(四种方式) 来自:  2010-11-10 23:30:11 VirtualBox的提供了四种网络接入模式,它们分别是: 1.NAT 网络地址转换模式(NAT,Network Address Translation) 2.Bridged Adapter 桥接模式 3.Internal 内部网络模式 4.Host-only Adapter 主机模式 第一种 NAT模式 解释: NAT模式是最简单的实现虚拟机上网的方式,你可以这样理解:Vhost访问网络的所有数

map遍历的四种方式

原文 http://blog.csdn.net/dayanxuqun/article/details/26348277 以下是map遍历的四种方式: [java] view plaincopyprint? // 一.推荐只用value的时候用,都懂的... // Map.values()遍历所有的value,不遍历key for (String v : map.values()) { System.out.println("value= " + v); } [java] view pl

ios页面间传递参数四种方式

ios页面间传递参数四种方式 1.使用SharedApplication,定义一个变量来传递. 2.使用文件,或者NSUserdefault来传递 3.通过一个单例的class来传递 4.通过Delegate来传递. IOS开发使用委托delegate在不同窗口之间传递数据是本文要介绍的内容,主要是来讲解如何使用委托delegate在不同窗口之间传递数据,具体内容来看详细内容.在IOS开发里两个UIView窗口之间传递参数方法有很多,比如 前面3种方法,暂且不说,这次主要学习如何使用通过Dele

C#_批量插入数据到Sqlserver中的四种方式

先创建一个用来测试的数据库和表,为了让插入数据更快,表中主键采用的是GUID,表中没有创建任何索引.GUID必然是比自增长要快的,因为你生成一个GUID算法所花的时间肯定比你从数据表中重新查询上一条记录的ID的值然后再进行加1运算要少.而如果存在索引的情况下,每次插入记录都会进行索引重建,这是非常耗性能的.如果表中无可避免的存在索引,我们可以通过先删除索引,然后批量插入,最后再重建索引的方式来提高效率. create database CarSYS;    go    use CarSYS;  

C# 字符串拼接性能探索 c#中+、string.Concat、string.Format、StringBuilder.Append四种方式进行字符串拼接时的性能

本文通过ANTS Memory Profiler工具探索c#中+.string.Concat.string.Format.StringBuilder.Append四种方式进行字符串拼接时的性能. 本文涉及程序为.NET Core 2.0控制台应用程序. 一.常量字符串拼接 private static void TestPerformance(Action action, int times) { Stopwatch sw = new Stopwatch(); sw.Start(); for(i