Spark 实时计算整合案例

1.概述

  最近有同学问道,除了使用 Storm 充当实时计算的模型外,还有木有其他的方式来实现实时计算的业务。了解到,在使用 Storm 时,需要编写基于编程语言的代码。比如,要实现一个流水指标的统计,需要去编写相应的业务代码,能不能有一种简便的方式来实现这一需求。在解答了该同学的疑惑后,整理了该实现方案的一个案例,供后面的同学学习参考。

2.内容

  实现该方案,整体的流程是不变的,我这里只是替换了其计算模型,将 Storm 替换为 Spark,原先的数据收集,存储依然可以保留。

2.1 Spark Overview

  Spark 出来也是很久了,说起它,应该并不会陌生。它是一个开源的类似于 Hadoop MapReduce 的通用并行计算模型,它拥有 Hadoop MapReduce 所具有的有点,但与其不同的是,MapReduce 的 JOB 中间输出结果可以保存在内存中,不再需要回写磁盘,因而,Spark 能更好的适用于需要迭代的业务场景。

2.2 Flow

  上面只是对 Spark 进行了一个简要的概述,让大家知道其作用,由于本篇博客的主要内容并不是讲述 Spark 的工作原理和计算方法,多的内容,这里笔者酒不再赘述,若是大家想详细了解 Spark 的相关内容,可参考官方文档。[参考地址

  接下来,笔者为大家呈现本案例的一个实现流程图,如下图所示:

  通过上图,我们可以看出,首先是采集上报的日志数据,将其存放于消息中间件,这里消息中间件采用的是 Kafka,然后在使用计算模型按照业务指标实现相应的计算内容,最后是将计算后的结果进行持久化,DB 的选择可以多样化,这里笔者就直接使用了 Redis 来作为演示的存储介质,大家所示在使用中,可以替换该存储介质,比如将结果存放到 HDFS,HBase Cluster,或是 MySQL 等都行。这里,我们使用 Spark SQL 来替换掉 Storm 的业务实现编写。

3.实现

  在介绍完上面的内容后,我们接下来就去实现该内容,首先我们要生产数据源,实际的场景下,会有上报好的日志数据,这里,我们就直接写一个模拟数据类,实现代码如下所示:

object KafkaEventProducer {
  private val uid = Array("123dfe", "234weq","213ssf")

  private val random = new Random()

  private var pointer = -1

  def getUserID(): String = {
    pointer = pointer + 1
    if (pointer >= users.length) {
      pointer = 0
      uid(pointer)
    } else {
      uid(pointer)
    }
  }

  def plat(): String = {
    random.nextInt(10) + "10"
  }

  def ip(): String = {
    random.nextInt(10) + ".12.1.211"
  }

  def country(): String = {
    "中国" + random.nextInt(10)
  }

  def city(): String = {
    "深圳" + random.nextInt(10)
  }

  def location(): JSONArray = {
    JSON.parseArray("[" + random.nextInt(10) + "," + random.nextInt(10) + "]")
  }

  def main(args: Array[String]): Unit = {
    val topic = "test_data3"
    val brokers = "dn1:9092,dn2:9092,dn3:9092"
    val props = new Properties()
    props.put("metadata.broker.list", brokers)
    props.put("serializer.class", "kafka.serializer.StringEncoder")

    val kafkaConfig = new ProducerConfig(props)
    val producer = new Producer[String, String](kafkaConfig)

    while (true) {
      val event = new JSONObject()

      event
        .put("_plat", "1001")
        .put("_uid", "10001")
        .put("_tm", (System.currentTimeMillis / 1000).toString())
        .put("ip", ip)
        .put("country", country)
        .put("city", city)
        .put("location", JSON.parseArray("[0,1]"))
      println("Message sent: " + event)
      producer.send(new KeyedMessage[String, String](topic, event.toString))

      event
        .put("_plat", "1001")
        .put("_uid", "10001")
        .put("_tm", (System.currentTimeMillis / 1000).toString())
        .put("ip", ip)
        .put("country", country)
        .put("city", city)
        .put("location", JSON.parseArray("[0,1]"))
      println("Message sent: " + event)
      producer.send(new KeyedMessage[String, String](topic, event.toString))

      event
        .put("_plat", "1001")
        .put("_uid", "10002")
        .put("_tm", (System.currentTimeMillis / 1000).toString())
        .put("ip", ip)
        .put("country", country)
        .put("city", city)
        .put("location", JSON.parseArray("[0,1]"))
      println("Message sent: " + event)
      producer.send(new KeyedMessage[String, String](topic, event.toString))

      event
        .put("_plat", "1002")
        .put("_uid", "10001")
        .put("_tm", (System.currentTimeMillis / 1000).toString())
        .put("ip", ip)
        .put("country", country)
        .put("city", city)
        .put("location", JSON.parseArray("[0,1]"))
      println("Message sent: " + event)
      producer.send(new KeyedMessage[String, String](topic, event.toString))
      Thread.sleep(30000)
    }
  }
}

   上面代码,通过 Thread.sleep() 来控制数据生产的速度。接下来,我们来看看如何实现每个用户在各个区域所分布的情况,它是按照坐标分组,平台和用户ID过滤进行累加次数,逻辑用 SQL 实现较为简单,关键是在实现过程中需要注意的一些问题,比如对象的序列化问题。这里,细节的问题,我们先不讨论,先看下实现的代码,如下所示:

object UserClickCountAnalytics {

  def main(args: Array[String]): Unit = {
    val sdf = new SimpleDateFormat("yyyyMMdd")
    var masterUrl = "local[2]"
    if (args.length > 0) {
      masterUrl = args(0)
    }

    // Create a StreamingContext with the given master URL
    val conf = new SparkConf().setMaster(masterUrl).setAppName("IPLoginCountStat")
    val ssc = new StreamingContext(conf, Seconds(5))

    // Kafka configurations
    val topics = Set("test_data3")
    val brokers = "dn1:9092,dn2:9092,dn3:9092"
    val kafkaParams = Map[String, String](
      "metadata.broker.list" -> brokers, "serializer.class" -> "kafka.serializer.StringEncoder")

    val ipLoginHashKey = "mf::ip::login::" + sdf.format(new Date())

    // Create a direct stream
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    val events = kafkaStream.flatMap(line => {
      val data = JSONObject.fromObject(line._2)
      Some(data)
    })

    def func(iter: Iterator[(String, String)]): Unit = {
      while (iter.hasNext) {
        val item = iter.next()
        println(item._1 + "," + item._2)
      }
    }

    events.foreachRDD { rdd =>
      // Get the singleton instance of SQLContext
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._
      // Convert RDD[String] to DataFrame
      val wordsDataFrame = rdd.map(f => Record(f.getString("_plat"), f.getString("_uid"), f.getString("_tm"), f.getString("country"), f.getString("location"))).toDF()

      // Register as table
      wordsDataFrame.registerTempTable("events")
      // Do word count on table using SQL and print it
      val wordCountsDataFrame = sqlContext.sql("select location,count(distinct plat,uid) as value from events where from_unixtime(tm,‘yyyyMMdd‘) = ‘" + sdf.format(new Date()) + "‘ group by location")
      var results = wordCountsDataFrame.collect().iterator

      /**
       * Internal Redis client for managing Redis connection {@link Jedis} based on {@link RedisPool}
       */
      object InternalRedisClient extends Serializable {

        @transient private var pool: JedisPool = null

        def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
          maxTotal: Int, maxIdle: Int, minIdle: Int): Unit = {
          makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle, true, false, 10000)
        }

        def makePool(redisHost: String, redisPort: Int, redisTimeout: Int,
          maxTotal: Int, maxIdle: Int, minIdle: Int, testOnBorrow: Boolean,
          testOnReturn: Boolean, maxWaitMillis: Long): Unit = {
          if (pool == null) {
            val poolConfig = new GenericObjectPoolConfig()
            poolConfig.setMaxTotal(maxTotal)
            poolConfig.setMaxIdle(maxIdle)
            poolConfig.setMinIdle(minIdle)
            poolConfig.setTestOnBorrow(testOnBorrow)
            poolConfig.setTestOnReturn(testOnReturn)
            poolConfig.setMaxWaitMillis(maxWaitMillis)
            pool = new JedisPool(poolConfig, redisHost, redisPort, redisTimeout)

            val hook = new Thread {
              override def run = pool.destroy()
            }
            sys.addShutdownHook(hook.run)
          }
        }

        def getPool: JedisPool = {
          assert(pool != null)
          pool
        }
      }

      // Redis configurations
      val maxTotal = 10
      val maxIdle = 10
      val minIdle = 1
      val redisHost = "dn1"
      val redisPort = 6379
      val redisTimeout = 30000
      InternalRedisClient.makePool(redisHost, redisPort, redisTimeout, maxTotal, maxIdle, minIdle)
      val jedis = InternalRedisClient.getPool.getResource
      while (results.hasNext) {
        var item = results.next()
        var key = item.getString(0)
        var value = item.getLong(1)
        jedis.hincrBy(ipLoginHashKey, key, value)
      }
    }

    ssc.start()
    ssc.awaitTermination()

  }
}

/** Case class for converting RDD to DataFrame */
case class Record(plat: String, uid: String, tm: String, country: String, location: String)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {

  @transient private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

  我们在开发环境进行测试的时候,使用 local[k] 部署模式,在本地启动 K 个 Worker 线程来进行计算,而这 K 个 Worker 在同一个 JVM 中,上面的示例,默认使用 local[k] 模式。这里我们需要普及一下 Spark 的架构,架构图来自 Spark 的官网,[链接地址]

  这里,不管是在 local[k] 模式,Standalone 模式,还是 Mesos 或是 YARN 模式,整个 Spark Cluster 的结构都可以用改图来阐述,只是各个组件的运行环境略有不同,从而导致他们可能运行在分布式环境,本地环境,亦或是一个 JVM 实利当中。例如,在 local[k] 模式,上图表示在同一节点上的单个进程上的多个组件,而对于 YARN 模式,驱动程序是在 YARN Cluster 之外的节点上提交 Spark 应用,其他组件都是运行在 YARN Cluster 管理的节点上的。

  而对于 Spark Cluster 部署应用后,在进行相关计算的时候会将 RDD 数据集上的函数发送到集群中的 Worker 上的 Executor,然而,这些函数做操作的对象必须是可序列化的。上述代码利用 Scala 的语言特性,解决了这一问题。

4.结果预览

  在完成上述代码后,我们执行代码,看看预览结果如下,执行结果,如下所示:

4.1 启动生产线程

4.2 Redis 结果预览

5.总结

  整体的实现内容不算太复杂,统计的业务指标,这里我们使用 SQL 来完成这部分工作,对比 Storm 来说,我们专注 SQL 的编写就好,难度不算太大。可操作性较为友好。

6.结束语

  这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

时间: 2024-08-02 08:22:59

Spark 实时计算整合案例的相关文章

【Streaming】30分钟概览Spark Streaming 实时计算

本文主要介绍四个问题: 什么是Spark Streaming实时计算? Spark实时计算原理流程是什么? Spark 2.X下一代实时计算框架Structured Streaming Spark Streaming相对其他实时计算框架该如何技术选型? 本文主要针对初学者,如果有不明白的概念可了解之前的博客内容. 1.什么是Spark Streaming? 与其他大数据框架Storm.Flink一样,Spark Streaming是基于Spark Core基础之上用于处理实时计算业务的框架.其实

【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

系统架构介绍 整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streaming 消费 Kafka 中的消息,同时消费记录由 Zookeeper 集群统一管理,这样即使 Kafka 宕机重启后也能找到上次的消费记录继而进行消费.在这里 Spark Streaming 首先从 MySQL 读取规则然后进行 ETL 清洗并计算多个聚合指标,最后将结果的一部分存储到 Hbase

Spark Streaming实时计算框架介绍

http://www.cnblogs.com/Leo_wl/p/3530464.html 随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐.用户行为分析等. Spark Streaming是建立在Spark上的实时计算框架,通过它提供的丰富的API.基于内存的高速执行引擎,用户可以结合流式.批处理和交互试查询应用.本文将详细介绍Spark Streaming实时计算框架的原理与特点.适用场景. Spar

Spark Streaming实时计算海量用户UV

提出需求 实时统计业务系统(web,APP之类)的访问人数,即所谓UV,或者DAU指标. 这个需求怕是流计算最最最常见的需求了. 计算UV的关键点就在于去重,即同一个人访问两次是只计一个UV的.在离线计算中统计UV比较容易想到的方法就是用group或distinct机制来去重.但是在实时计算场景,还用group就不太科学了,一个是全量数据的group是比较费时的,第二个是全量数据的group是很费内存和CPU的.特别是当用户量巨大的时候,还要做到秒级更新就更难了. 总结起来,需求就是:海量用户场

实时计算平台

实时计算平台中的弹性集群资源管理 本文系微博运维数据平台(DIP)在实时计算平台的研发过程中集群资源管理方面的一些经验总结和运用,主要关注以下几个问题: 异构资源如何整合? 实时计算应用之间的物理资源如何隔离? 集群资源利用率如何提高? 集群运维成本如何降低? 1. 背景 这是我们初期的一个实时计算架构,大致划分为三个部分: (1)日志收集: 使用Rsynlog.Flume.Scribe汇聚各个业务方发送过来的日志数据:如果条件允许,业务方也可以直接将数据写入Kafka. (2)日志传输: 使用

实时计算平台中的弹性集群资源管理

本文系微博运维数据平台(DIP)在实时计算平台的研发过程中集群资源管理方面的一些经验总结和运用,主要关注以下几个问题: 异构资源如何整合? 实时计算应用之间的物理资源如何隔离? 集群资源利用率如何提高? 集群运维成本如何降低? 1. 背景 这是我们初期的一个实时计算架构,大致划分为三个部分: (1)日志收集: 使用Rsynlog.Flume.Scribe汇聚各个业务方发送过来的日志数据:如果条件允许,业务方也可以直接将数据写入Kafka. (2)日志传输: 使用Kafka作为日志收集组件与实时应

权威详解 | 阿里新一代实时计算引擎 Blink,每秒支持数十亿次计算

王峰,淘宝花名"莫问",2006年毕业后即加入阿里巴巴集团,长期从事搜索和大数据基础技术研发工作,目前在计算平台事业部,负责实时计算北京研发团队. 在阿里巴巴的11年工作期间,持续专注大数据计算与存储技术领域,基于Hadoop开源生态打造的数据基础设施一直服务于搜索.推荐等阿里核心电商业务场景,最近一年带领团队对Apache Flink进行了大量架构改进.功能完善和性能提升,打造出了阿里新一代实时计算引擎: Blink.目前数千台规模的Blink生产集群已经开始在线支持搜索.推荐.广告

细细品味架构(第1期)_实时计算在点评

Hi,博友: 欢迎查阅<细细品味架构>系列,本系列将和我一起学习,并慢慢走近架构师的神秘世界. 本期目录: 1.本期内容... 3 1.1 版权申明... 3 1.2 内容详情... 3 1.2.1 实时计算在点评的使用场景... 3 1.2.2 实时计算在业界的使用场景... 4 1.2.3 点评如何构建实时计算平台... 5 1.2.4 Storm基础知识简单介绍... 6 1.2.5 如何保证业务运行可靠性... 8 1.2.5 Storm日常使用经验分享... 10 1.2.6 关于计

Spark视频第5期:Spark SQL架构和案例深入实战

Spark SQL架构和案例深入实战 视频地址:http://pan.baidu.com/share/link?shareid=3629554384&uk=4013289088&fid=977951266414309 王家林老师(邮箱:[email protected] QQ: 1740415547) Spark亚太研究院院长和首席专家,中国目前唯一的移动互联网和云计算大数据集大成者. 在Spark.Hadoop.Android等方面有丰富的源码.实务和性能优化经验.彻底研究了Spark从