spark读取hbase(NewHadoopAPI 例子)

package cn.piesat.controller

import java.text.{DecimalFormat, SimpleDateFormat}import java.utilimport java.util.concurrent.{CountDownLatch, Executors, Future}

import ba.common.log.enums.{LogLevel, LogType}import ba.common.log.utils.LogUtilimport cn.piesat.constants.{HbaseZookeeperConstant, RowkeyConstant}import cn.piesat.domain._import cn.piesat.service.impl.{MsgServiceImpl, SparkTaskServiceImpl}import cn.piesat.thread.HbaseQueryThreadimport com.google.gson.Gsonimport org.apache.hadoop.hbase.HBaseConfigurationimport org.apache.hadoop.hbase.client.{Result, Scan}import org.apache.hadoop.hbase.filter.{Filter, FilterList}import org.apache.hadoop.hbase.mapreduce.TableInputFormatimport org.apache.hadoop.hbase.protobuf.ProtobufUtilimport org.apache.hadoop.hbase.util.{Base64, Bytes}import org.apache.hadoop.hbase.io.ImmutableBytesWritableimport org.apache.spark.rdd.RDDimport org.apache.spark.{SparkConf, SparkContext}import pie.storage.db.domain._import pie.storage.db.enums.{CompareOp, DataBaseType}

/**  * @author liujie  *         spark查询hbase的入口类  */object HbaseReader {  val sparkTaskService = new SparkTaskServiceImpl  val msgService = new MsgServiceImpl  val sparkAppName = "sparkApp"  val sparkMaster = "local[6]"  var taskId = 8  val serviceNum = 76  val systemId = 12011  val systemName = "8888"  val cf = "cf1"  val cell = "content"  val zookeeperHost = "bigdata03,bigdata04,bigdata05"  val zookeeperPort = "2181"  val excutor=Executors.newCachedThreadPool()

def main(args: Array[String]): Unit = {    try{      if (args.length > 0) {        taskId = args(0).toInt      }      /**        * 第一步,获取SparkContext对象        */      val sc = getSparkContext      /**        * 第二步,获得查询参数集合        */      val taskParamList = getTaskParam(taskId, sc)      /**        * 第三步,进行hbase数据查询        */      val rowkeyRDD = queryHbaseData(taskParamList, sc)

rowkeyRDD.saveAsTextFile("file://")      println("rowkeyRDD的数量为:" + rowkeyRDD.count())      val rowkey = rowkeyRDD.first()      println("取出的值为:"+util.Arrays.toString(rowkey._2.getValue(cf.getBytes(),cell.getBytes())))

/**        * 第四步,进行数据解析        */

/**        * 第五步,将结果写入文本,文本地址在第二步中的taskParamList中        */

}catch {      case e:Exception =>{        e.printStackTrace()      }    }finally {      excutor.shutdown()    }

excutor.shutdown()

}

/**    * 获取任务Id    *    * @param args    * @return    */  private def getTaskId(args: Array[String]): Int = {    if (args == null || args.length <= 0) {      -1;    } else {      try {        args.apply(0).toInt      } catch {        case e: Exception =>          -1      }    }  }

/**    * 获取sparkContext    *    * @return    */

private def getSparkContext(): SparkContext = {    val sparkConf = new SparkConf().setAppName(sparkAppName).setMaster(sparkMaster)    sparkConf.set("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")    sparkConf.set("spark.network.timeout", "300")    sparkConf.set("spark.streaming.unpersist", "true")    sparkConf.set("spark.scheduler.listenerbus.eventqueue.size", "100000")    sparkConf.set("spark.storage.memoryFraction", "0.5")    sparkConf.set("spark.shuffle.consolidateFiles", "true")    sparkConf.set("spark.shuffle.file.buffer", "64")    sparkConf.set("spark.shuffle.memoryFraction", "0.3")    sparkConf.set("spark.reducer.maxSizeInFlight", "24")    sparkConf.set("spark.shuffle.io.maxRetries", "60")    sparkConf.set("spark.shuffle.io.retryWait", "60")    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")    new SparkContext(sparkConf)  }

/**    * 获取sparkTask的任务参数集合    *    * @param taskId    * @return    */  private def getTaskParam(taskId: Int, sc: SparkContext): List[Tuple4[String, String, String, util.List[Filter]]] = {    var list: List[Tuple4[String, String, String, util.List[Filter]]] = List()    val sparkTask = sparkTaskService.getSparkTaskByTaskId(taskId)    val params = sparkTask.getQueryParam    val gson = new Gson    val sparkQueryParams = gson.fromJson(params, classOf[SparkQueryParams])    try {      //1.**      val systemId = sparkQueryParams.getSystemId      //2.开始时间      val startTime = sparkQueryParams.getStartTime      //3.结束时间      val endTime = sparkQueryParams.getEndTime      //4.**      val stationId = sparkQueryParams.getStationId      val paramList = sparkQueryParams.getParams      for (i <- 0 until paramList.size()) {        val param = paramList.get(i)        //5.**        val msgId = param.getMsgId        //6.**        val sinkId = param.getSinkId        //7.**        val sourceId = param.getSourceId        //8.表名        val tableName = msgService.getTieYuanMsgTableNameById(msgId);        for (num <- 0 until serviceNum) {          val rowkeyAndFilters = getRowkeyAndFilters(num, systemId, startTime, endTime, stationId, msgId, sinkId, sourceId, tableName)          list = rowkeyAndFilters :: list        }      }      list    } catch {      case e: Exception =>        LogUtil.writeLog(systemId, LogLevel.ERROR, LogType.NORMAL_LOG, systemName + " Error Info:任务参数异常。" + e)        null    }  }

/**    * hbase数据查询    */  private def queryHbaseData(taskParamList: List[(String, String, String, util.List[Filter])], sc: SparkContext): RDD[(ImmutableBytesWritable, Result)] = {    var rdd: RDD[(ImmutableBytesWritable, Result)] = null    val latch:CountDownLatch=new CountDownLatch(taskParamList.length)    val list: util.List[Future[RDD[Tuple2[ImmutableBytesWritable, Result]]]]=new util.ArrayList[Future[RDD[Tuple2[ImmutableBytesWritable, Result]]]]()    for (taskParam <- taskParamList) {      list.add(excutor.submit(new HbaseQueryThread(taskParam,sc,latch)))    }    import scala.collection.JavaConversions._    for(li <- list){      if(rdd==null){        rdd=li.get()      }else{        rdd=rdd.++(li.get())      }    }    latch.await()    rdd  }

/**    * 获取    *    * @param num    * @param systemId    * @param startTime    * @param endTime    * @param stationId    * @param msgId    * @param sinkId    * @param sourceId    * @return    */  private def getRowkeyAndFilters(num: Int, systemId: Int, startTime: String,                                  endTime: String, stationId: Int, msgId: Int,                                  sinkId: Int, sourceId: Int,                                  tableName: String): Tuple4[String, String, String, util.List[Filter]]

= {    //线程非安全,因此每次调用时创建新的对象    val simpleDateFormat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS")    val simpleDateFormat2 = new SimpleDateFormat("yyyyMMddHHmmssSSS")    val decimalFormat = new DecimalFormat("00")    val queryDef = new QueryDef    //1.设置数据库    queryDef.setDataBaseType(DataBaseType.HBASE)    //2.设置表名    queryDef.setTableName(tableName)    //3.设置请求参数集合    //3.1设置**Id参数    val systemIdParam = new QueryParam    systemIdParam.setField(new Field(new FieldInfo(RowkeyConstant.SYSTEM_ID), new FieldValue(systemId)))    systemIdParam.setCompareOp(CompareOp.EQUAL)    //3.2设置**    val msgIdParam = new QueryParam    msgIdParam.setField(new Field(new FieldInfo(RowkeyConstant.MSG_ID), new FieldValue(msgId)))    msgIdParam.setCompareOp(CompareOp.EQUAL)    //3.3设置开始时间参数    val startTimeParam = new QueryParam    val startTimeFormat = simpleDateFormat2.format(simpleDateFormat1.parse(startTime))    startTimeParam.setField(new Field(new FieldInfo(RowkeyConstant.TIME), new FieldValue(startTimeFormat)))    startTimeParam.setCompareOp(CompareOp.GREATER)    //3.4设置结束时间参数    val endTimeParam = new QueryParam    val endTimeFormat = simpleDateFormat2.format(simpleDateFormat1.parse(endTime))    endTimeParam.setField(new Field(new FieldInfo(RowkeyConstant.TIME), new FieldValue(endTimeFormat)))    endTimeParam.setCompareOp(CompareOp.LESS)    //3.5设置**    val sourceParam = new QueryParam    sourceParam.setField(new Field(new FieldInfo(RowkeyConstant.SINK_ID), new FieldValue(sinkId)))    sourceParam.setCompareOp(CompareOp.EQUAL)    //3.6设置**    val sinkParam = new QueryParam    sinkParam.setField(new Field(new FieldInfo(RowkeyConstant.SOURCE_ID), new FieldValue(sourceId)))    sinkParam.setCompareOp(CompareOp.EQUAL)    val queryParamList = util.Arrays.asList(systemIdParam, msgIdParam, startTimeParam, endTimeParam, sourceParam, sinkParam)    queryDef.setListQueryParam(queryParamList)    val startRowkey = decimalFormat.format(num) + queryDef.getStartRowKey(classOf[String])    val endRowkey = decimalFormat.format(num) + queryDef.getStopRowKey(classOf[String])    val filters = queryDef.getFilters(2, num, classOf[String])    new Tuple4(tableName, startRowkey, endRowkey, filters)  }

/**    * 进行hbase查询    *    * @param taskParam    * @param sc    */  def getHbaseQueryRDD(taskParam: (String, String, String, util.List[Filter]), sc: SparkContext): RDD[(ImmutableBytesWritable, Result)] = {    val hbaseConf = HBaseConfiguration.create()    hbaseConf.set(HbaseZookeeperConstant.HBASE_ZOOKEEPER_QUORUM, zookeeperHost)    hbaseConf.set(HbaseZookeeperConstant.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT, zookeeperPort)    hbaseConf.set(TableInputFormat.INPUT_TABLE, taskParam._1)    val scan = new Scan()    scan.setStartRow(Bytes.toBytes(taskParam._2))    scan.setStopRow(Bytes.toBytes(taskParam._3))    val filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, taskParam._4)    scan.setFilter(filterList)    hbaseConf.set(TableInputFormat.SCAN, convertScanToString(scan))    val rs = sc.newAPIHadoopRDD(      hbaseConf,      classOf[TableInputFormat],      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],      classOf[org.apache.hadoop.hbase.client.Result])    //todo 解析    rs//   rs.map(tuple2=>{//     val result=tuple2._2//     result.//   })  }

private def convertScanToString(scan: Scan) = {    val proto = ProtobufUtil.toScan(scan)    Base64.encodeBytes(proto.toByteArray)  }}

原文地址:https://www.cnblogs.com/runnerjack/p/9976112.html

时间: 2024-10-11 23:02:08

spark读取hbase(NewHadoopAPI 例子)的相关文章

Spark读取Hbase中的数据_云帆大数据分享

Spark读取Hbase中的数据 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1).调用parallelize函数直接从集合中获取数据,并存入RDD中:Java版本如下: 1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); Scala版本如下: 1 val myRDD= sc.parallelize(List(1,2,3)) 这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初

Spark 读取 Hbase 优化 --手动划分 region 提高并行数

一. Hbase 的 region 我们先简单介绍下 Hbase 的 架构和 region : 从物理集群的角度看,Hbase 集群中,由一个 Hmaster 管理多个 HRegionServer,其中每个 HRegionServer 都对应一台物理机器,一台 HRegionServer 服务器上又可以有多个 Hregion(以下简称 region).要读取一个数据的时候,首先要先找到存放这个数据的 region.而 Spark 在读取 Hbase 的时候,读取的 Rdd 会根据 Hbase 的

Spark读取HBase

背景:公司有些业务需求是存储在HBase上的,总是有业务人员找我要各种数据,所以想直接用Spark( shell) 加载到RDD进行计算 摘要: 1.相关环境 2.代码例子 内容 1.相关环境 Spark 版本:2.0.0 Hadoop 版本:2.4.0 HBase 版本:0.98.6 注:使用CDH5搭建集群 编写提交脚本 export SPARK2_HOME=/var/lib/hadoop-hdfs/spark-2.0.0-bin-hadoop2.4 export HBASE_LIB_HOM

spark读取hbase数据,如果表存在则不做任何操作,如果表不存在则新建表。

import java.text.SimpleDateFormat import java.util.Date import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result, Scan} import org.apache.hadoop.hbase.io.I

Spark 读取Hbase表数据并实现类似groupByKey操作

一.概述 程序运行环境很重要,本次测试基于: hadoop-2.6.5 spark-1.6.2 hbase-1.2.4 zookeeper-3.4.6 jdk-1.8 废话不多说了,直接上需求 Andy column=baseINFO:age,  value=21 Andy column=baseINFO:gender,  value=0 Andy column=baseINFO:telphone_number, value=110110110 Tom  column=baseINFO:age,

Spark 读取 HBase 数据

1.pom.xml 版本号 <properties> <hbase.version>2.2.2</hbase.version> <hadoop.version>2.10.0</hadoop.version> <spark.version>2.4.2</spark.version> </properties> 依赖包 <dependencies> <dependency> <grou

Spark读取Hbase的数据

val conf = HBaseConfiguration.create() conf.addResource(new Path("/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/lib/hbase/conf/hbase-site.xml")) conf.addResource(new Path("/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/lib/hadoop/etc/had

Spark操作Hbase

Spark 下操作 HBase(1.0.0 新 API) HBase经过七年发展,终于在今年2月底,发布了 1.0.0 版本.这个版本提供了一些让人激动的功能,并且,在不牺牲稳定性的前提下,引入了新的API.虽然 1.0.0 兼容旧版本的 API,不过还是应该尽早地来熟悉下新版API.并且了解下如何与当下正红的 Spark 结合,进行数据的写入与读取.鉴于国内外有关 HBase 1.0.0 新 API 的资料甚少,故作此文. 本文将分两部分介绍,第一部分讲解使用 HBase 新版 API 进行

spark 对hbase 操作

本文将分两部分介绍,第一部分讲解使用 HBase 新版 API 进行 CRUD 基本操作:第二部分讲解如何将 Spark 内的 RDDs 写入 HBase 的表中,反之,HBase 中的表又是如何以 RDDs 形式加载进 Spark 内的. 环境配置 为了避免版本不一致带来不必要的麻烦,API 和 HBase环境都是 1.0.0 版本.HBase 为单机模式,分布式模式的使用方法类似,只需要修改HBaseConfiguration的配置即可. 开发环境中使用 SBT 加载依赖项 name :=