Spark读取Hbase的数据

val conf = HBaseConfiguration.create()
    conf.addResource(new Path("/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/lib/hbase/conf/hbase-site.xml"))
    conf.addResource(new Path("/opt/cloudera/parcels/CDH-5.4.4-1.cdh5.4.4.p0.4/lib/hadoop/etc/hadoop/core-site.xml"))
    conf.set(TableInputFormat.INPUT_TABLE, "FLOW")

    //添加过滤条件,年龄大于 18 岁
    //val scan = new Scan()
    //conf.set(TableInputFormat.SCAN, convertScanToString(scan))
    /*
    scan.setFilter(new SingleColumnValueFilter("basic".getBytes, "age".getBytes,
      CompareOp.GREATER_OR_EQUAL, Bytes.toBytes(18)))
    */

    val usersRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result])

    val data1 = usersRDD.count()

    val sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSSS")

    println("data length:" + data1)

    var map = HashMap[String, HashMap[String, collection.mutable.ArrayBuffer[Double]]]()

    usersRDD.collect().map {
      case (_, result) =>
        val key = Bytes.toInt(result.getRow)
        println("Key:" + key)
        val ip = Bytes.toString(result.getValue("F".getBytes, "SADDR".getBytes))
        val port = Bytes.toString(result.getValue("F".getBytes, "SPORT".getBytes))
        val startTimeLong = Bytes.toString(result.getValue("F".getBytes, "STIME".getBytes))
        val endTimeLong = Bytes.toString(result.getValue("F".getBytes, "LTIME".getBytes))
        val protocol = Bytes.toString(result.getValue("F".getBytes, "PROTO".getBytes))
        val sumTime = Bytes.toString(result.getValue("F".getBytes, "DUR".getBytes))
        val sum = Bytes.toString(result.getValue("F".getBytes, "DBYTES".getBytes)).toDouble

        println("ip:" + ip + ",port:" + port + ",startTime:" + startTimeLong + ",endTime:" + endTimeLong + ",protocol:" + protocol + ",sum:" + sum)

        //ip+port+udp,14:02 14:07 List
        //ip+port+tcp,15:02 15:07 List
        val startTimeDate = sf.parse(startTimeLong)
        val endTimeLongDate = sf.parse(endTimeLong)
        val startHours = startTimeDate.getHours
        val startMinutes = startTimeDate.getMinutes

        val endHours = endTimeLongDate.getHours
        val endMinutes = endTimeLongDate.getMinutes

        val key1 = ip + "_" + port + "_" + protocol
        println("key1:" + key1)

        val key2 = startHours + ":" + startMinutes + "_" + endHours + ":" + endMinutes

        println("key2:" + key2)

        val tmpMap = map.get(key1)

        if (!tmpMap.isEmpty) {
          println("--------------------map is not null:" + tmpMap.size + "--------------------")
          val sumArray = tmpMap.get.get(key2)
          if (!sumArray.isEmpty) {
            sumArray.get += sum
          }
        } else {
          println("--------------------map is null--------------------")
          //如果当前Key不存在的话,是一个全新的Ip
          val sumArray = collection.mutable.ArrayBuffer[Double]()
          sumArray += sum

          val secondMap = HashMap[String, collection.mutable.ArrayBuffer[Double]]()
          secondMap += (key2 -> sumArray)
          map += (key1 -> secondMap)
        }
        map
        println("map size-----------------:" + map.size)
    }

    println("map size:" + map.size)

    map.map(e => {
      println("--------------------Statistics start --------------------")
      val resultKey1 = e._1
      val resultVal1 = e._2
      println("resultKey1:" + resultKey1)
      resultVal1.foreach(f => {
        val resultKey2 = f._1
        val resultVal2 = f._2
        println("resultKey2:" + resultKey2)
        println("-----------------resultVal2:" + resultVal2.length)

        resultVal2.map(f=>{
            println("------------------------f:"+f)
        })

        val dataArray = resultVal2.map(f => Vectors.dense(f))

        val summary: MultivariateStatisticalSummary = Statistics.colStats(sc.parallelize(dataArray))

        //
        println("--------------------mean:" + summary.mean + " --------------------")
        println("--------------------variance:" + summary.variance + " --------------------")

        println("--------------------mean apply 0:" + summary.mean.toArray.apply(0) + " --------------------")
        println("--------------------variance apply 0:" + summary.variance.apply(0) + " --------------------")

        val upbase = summary.mean.toArray.apply(0) + 1.960 * Math.sqrt(summary.variance.apply(0))
        val downbase = summary.mean.toArray.apply(0) - 1.960 * Math.sqrt(summary.variance.apply(0))
        println("------------------- " + upbase + " ---------- " + downbase)
        val df = new DecimalFormat(".##")
        val upbaseString = df.format(upbase)
        val downbaseString = df.format(downbase)
        //resultMap.put(key, value)
        val result3 = HashMap[Double, Double]()
        //result3 +=(upbase -> downbase)
        println("ip port:" + resultKey1 + ",time:" + resultKey2 + ",upbase:" + upbase + ",downbase:" + downbase)
      })
    })

    println("--------------------baseLine end --------------------")
    sc.stop()
时间: 2024-10-10 20:39:23

Spark读取Hbase的数据的相关文章

Spark 读取Hbase表数据并实现类似groupByKey操作

一.概述 程序运行环境很重要,本次测试基于: hadoop-2.6.5 spark-1.6.2 hbase-1.2.4 zookeeper-3.4.6 jdk-1.8 废话不多说了,直接上需求 Andy column=baseINFO:age,  value=21 Andy column=baseINFO:gender,  value=0 Andy column=baseINFO:telphone_number, value=110110110 Tom  column=baseINFO:age,

Spark读取Hbase中的数据_云帆大数据分享

Spark读取Hbase中的数据 大家可能都知道很熟悉Spark的两种常见的数据读取方式(存放到RDD中):(1).调用parallelize函数直接从集合中获取数据,并存入RDD中:Java版本如下: 1 JavaRDD<Integer> myRDD = sc.parallelize(Arrays.asList(1,2,3)); Scala版本如下: 1 val myRDD= sc.parallelize(List(1,2,3)) 这种方式很简单,很容易就可以将一个集合中的数据变成RDD的初

Spark 读取 Hbase 优化 --手动划分 region 提高并行数

一. Hbase 的 region 我们先简单介绍下 Hbase 的 架构和 region : 从物理集群的角度看,Hbase 集群中,由一个 Hmaster 管理多个 HRegionServer,其中每个 HRegionServer 都对应一台物理机器,一台 HRegionServer 服务器上又可以有多个 Hregion(以下简称 region).要读取一个数据的时候,首先要先找到存放这个数据的 region.而 Spark 在读取 Hbase 的时候,读取的 Rdd 会根据 Hbase 的

spark读取hbase(NewHadoopAPI 例子)

package cn.piesat.controller import java.text.{DecimalFormat, SimpleDateFormat}import java.utilimport java.util.concurrent.{CountDownLatch, Executors, Future} import ba.common.log.enums.{LogLevel, LogType}import ba.common.log.utils.LogUtilimport cn.p

spark读取hbase数据,如果表存在则不做任何操作,如果表不存在则新建表。

import java.text.SimpleDateFormat import java.util.Date import org.apache.hadoop.hbase.{HColumnDescriptor, HTableDescriptor, HBaseConfiguration} import org.apache.hadoop.hbase.client.{HBaseAdmin, Put, Result, Scan} import org.apache.hadoop.hbase.io.I

Spark 读取 HBase 数据

1.pom.xml 版本号 <properties> <hbase.version>2.2.2</hbase.version> <hadoop.version>2.10.0</hadoop.version> <spark.version>2.4.2</spark.version> </properties> 依赖包 <dependencies> <dependency> <grou

Spark读取HBase

背景:公司有些业务需求是存储在HBase上的,总是有业务人员找我要各种数据,所以想直接用Spark( shell) 加载到RDD进行计算 摘要: 1.相关环境 2.代码例子 内容 1.相关环境 Spark 版本:2.0.0 Hadoop 版本:2.4.0 HBase 版本:0.98.6 注:使用CDH5搭建集群 编写提交脚本 export SPARK2_HOME=/var/lib/hadoop-hdfs/spark-2.0.0-bin-hadoop2.4 export HBASE_LIB_HOM

Spark操作Hbase

Spark 下操作 HBase(1.0.0 新 API) HBase经过七年发展,终于在今年2月底,发布了 1.0.0 版本.这个版本提供了一些让人激动的功能,并且,在不牺牲稳定性的前提下,引入了新的API.虽然 1.0.0 兼容旧版本的 API,不过还是应该尽早地来熟悉下新版API.并且了解下如何与当下正红的 Spark 结合,进行数据的写入与读取.鉴于国内外有关 HBase 1.0.0 新 API 的资料甚少,故作此文. 本文将分两部分介绍,第一部分讲解使用 HBase 新版 API 进行

spark 对hbase 操作

本文将分两部分介绍,第一部分讲解使用 HBase 新版 API 进行 CRUD 基本操作:第二部分讲解如何将 Spark 内的 RDDs 写入 HBase 的表中,反之,HBase 中的表又是如何以 RDDs 形式加载进 Spark 内的. 环境配置 为了避免版本不一致带来不必要的麻烦,API 和 HBase环境都是 1.0.0 版本.HBase 为单机模式,分布式模式的使用方法类似,只需要修改HBaseConfiguration的配置即可. 开发环境中使用 SBT 加载依赖项 name :=