【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

系统架构介绍

整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streaming 消费 Kafka 中的消息,同时消费记录由 Zookeeper 集群统一管理,这样即使 Kafka 宕机重启后也能找到上次的消费记录继而进行消费。在这里 Spark Streaming 首先从 MySQL 读取规则然后进行 ETL 清洗并计算多个聚合指标,最后将结果的一部分存储到 Hbase 中,另一部分重新发回到 Kafka 中再消费更新到 MySQL 中,监控前端实时获取指标进行展示。

主要问题

在上面的框架介绍中,下游监控系统的指标数据来源于 Spark Streaming 的实时计算,可见 Streaming 计算处于极为重要的环节,而计算性能不足就会成为整个系统的瓶颈。大部分时候我们 Spark 指标计算都能应付过来,但是在节日流量翻倍的情况下就力不从心了,为应对这种情况之前采取的措施一般是关闭一些非关键性日志接口把监控流量降下来。虽然此举能暂时解决问题,但仍需要治标更治本的方法。

首先来看看优化前 Streaming 的计算能力。

图一所示为每批次(30 秒)800W+ 日志流量下,Spark Streaming 计算大概需要 50 多秒。虽无明显延时,但计算能力很弱鸡 14w/s

随着流量不断的增大,如图 2 所示为每批次(时间 30 秒)1000W+ 条日志流量下,Spark 计算已严重超时,越来越多的 batch 加入到 queue 的队列等待处理,此时监控系统基本失效。

既然痛点已找到,那么剩下要做的就是想办法去优化。下文在讲如何优化前,先带大家认识下流式处理框架中的两个经典好搭档 Spark Streaming + Kafka。

3Spark Streaming + Kafka 流处理框架为什么选择 Spark Streaming 和 Kafka

  • Kafka 支持分布式及出色的吞吐量
  • Spark Streaming 流式处理框架已被各大公司广泛应用且成熟度高,支持大部分的数据源和存储,如下图所示其丰富生态圈

  • Kafka 与 Spark Streaming 集成度高

Spark Streaming 初识

Spark Streaming 接受实时输入数据并将数据切分成多个 batches, 然后由 Spark engine 进行计算并将结果输出到外部存储。

接下来看看 Spark Streaming 从 Kafka 中接受数据的两种方式。

基于 Receiver 方式

这种方式使用 Receiver 方式接受数据,实现是以 Kafka 高阶用户 API 接口,收到的数据会存到 Spark executor,之后 Spark Streaming 提交 Job 处理这些数据。为了保证数据不会丢失,需要开启 Write Ahead Logs,流程如下图所示:

基于 Direct 方式

在 Spark 1.3 之后,引入了 Direct 方式以提供更强的端到端的保证。不同于 Receiver 方式,其会周期性的获取 Kafka 每个 topic 中每个 Partition 最新的 offsets。之后 Spark job 会基于 Kafka simple API 读取 Kafka 相应 Offset 数据并进行处理,流程如下图所示:

该方式相对于 Receiver 方式具有以下优势:

  • 简化的并行度:基于 Receiver 的方式中要提高数据传输并行度我们需要创建多个 Receiver 实例之后再 Union 起来合并成一个 Dstream。而 Direct 方式中提供了更为简单的映射关系,Kafka 中的 partition 与 Spark RDD 中的 partition 是一一映射的,因而可以并行读取数据。
  • 高效性:在 Receiver 的方式中,为了达到零数据丢失需要将数据备份到 Write Ahead Log 中,这样系统中就保存了两份数据浪费资源。而 Direct 方式只要知道当前消费的 Offsets 就能恢复出相应的数据。
  • 精确一次的语义保证:基于 Receiver 的方式中,通过 Kafka 的高阶 API 接口从 Zookeeper 中获取 offset 值,这也是传统的从 Kafka 中读取数据的方式,但由于 Spark Streaming 消费的数据和 Zookeeper 中记录的 offset 不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶 Kafka API,Offsets 可以利用 Spark Streaming 的 checkpoints 进行记录来消除这种不一致性。

以上翻译自官方文档。既然 Direct 方式有这么多优点,那么在我们的监控系统中理所当然也用了这种方式,同时为了能使基于 Zookeeper 的 Kafka monitor 工具生效,我们也实现了 Offset 的管理,具体流程如下:

  • Spark Streaming 任务启动后首先去 Zookeeper 中去读取特定 topic 中每个 Partition 的 offset 并组装 fromOffsets 变量;
  • Spark Streaming 获取到 fromOffsets 后通过 KafkaUtils.createDirectStream 去消费 Kafka 的数据;
  • 读取 Kafka 数据然后进行批的逻辑处理,如下图所示为该 Job 的 DAG,包括一些基本的 RDD 算子操作 (flatMap, reduceByKey, Map 等), 并将计算结果存储到 Hbase 和回吐到 Kafka 中,最后更新 offsets 到 Zookeeper 中。

4Spark Streaming 性能优化及任务监控

重点来了,那么说起优化,我们首先想到的就是最大限度利用集群资源,将硬件性能压榨到极致,先看看如何在用 spark-submit 提交命令的时候进行资源调优。

资源参数调优

增加 Driver 和 Executor 的内存(driver-memory、executor-memory)

通过增加 Driver 和 Executor 的内存数量,可以减小程序 Out of memory 和 意外崩溃 产生的概率,当然也不能无限制增加以免造成资源的浪费或者导致其它任务申请资源失败。

设置合理的 CPU 个数

--num-executors 和 --executor-cores 两个参数配合使用来调节计算资源占有情况。通常对于集群中一定量的 CPU Core,设置较多的 Executor 个数和较少的 Executor core 个数来达到资源最大使用率。

结合内存和 CPU 参数,我们来举个例子,看看怎么设置会比较合理。

假设在拥有 6 个节点,每个节点有 16 个 Core 和 64G 内存集群中提交 Job, 一种可能的配置参数如下:

--num-executors 6 –executor-cores 15 –executor-memory 63G

这种方式其实不太合理,原因如下:

  • 由于我们的 OS 以及 Hadoop daemons 要占用一定内存,因此 yarn.nodemanager.resources.memory-mb 和 yarn.nodemanager.resources.cpu-vcores 不可能占用 100% 资源,一般是 63 * 1024 和 15Core.
  • Application master 也会占用一个 core, 因此在 master 节点上也不可能设置为 15 个 core
  • 每个 executor 设置 15Core 会造成低效的 HDFS I/O 吞吐量

鉴于上面的原因,一种更为合理的的设置是:

--num-executors 17 –executor-cores 5 –executor-memory 19G

增加 parallelism:增加 Spark Partition 数量

Partition 即 Spark 中的数据分区,每个 task 在同一时间只能处理一个 Partition 的数据,这个值不能设置的太小也不能设置的太大。

  • 设置的太大,每个分区中的数据很少,因此会需要更多的 task 来处理这些数据,增加任务调度器的负担
  • 设置的太小,每个分区中的数据很多,也会对内存造成压力,executor 无法最大程度利用集群计算资源。

通过 spark.default.parallelism 可以设置 spark 默认的分区数量,在这里我们设置的 1000.

此外在 Spark Streaming + Kafka 的案例中,我们采用 Direct 方式从 Kafka 中获取数据,此时 Kafka partition 的数量和 Spark RDD 的分区数量是 1:1 映射的关系,而调优之前该 topic 创建时的分区数量是 64,并发度太小导致集群资源利用不够。我们一开始采取的优化方式是创建 InputDstream 之后先 Repartition 到一个更大的并行度,然后进行逻辑计算,结果证明该方式较之前性能上有一定提升但还是没有达到我们想要的理想结果,这是由于 repartition 会造成 Shuffle 操作,而 Shuffle 比较耗时,会引起大量的磁盘 IO, 序列化、反序列化、网络数据传输等操作,因此要尽量避免。之后我们直接从数据源头 Kafka 那边增加 Topic 分区数(240),从而极大的提升了处理效率。如图所示:

设置合理的批处理时间和 Kafka 数据拉取速率

在 StreamingContext 初始化的时候需要设置批处理时间,而这个值不能设置的太小,太小不仅会导致 SparkStreaming 频繁的提交作业增加系统调度的负担,如果处理不过来容易造成作业的积压发生阻塞。此外还要根据生产者写入 Kafka 的速率以及 Streaming 本身消费数据的速率设置合理的 Kafka 读取速率(spark.streaming.kafka.maxRatePerPartition),使得 Spark Streaming 的每个 task 都能在 Batch 时间内及时处理完 Partition 内的数据,使 Scheduling Delay 尽可能的小。

最后还可以设置 spark.steaming.backpressure.enabled 为 true,这就使得如果在某一时刻数据量突然增大导致处理时间远大于 Batch interval 的情况下,告诉 Kafka 你需要降低发送速率了。下图所示为理想的处理状态。

使用 Kryo 序列化

Spark Streaming 在传输、使用对象的时候要用到序列化和反序列化,而 Kryo 序列化方式比 Java 序列化机制性能高 10 倍,因此我们可在使用的时候注册自定义类型,如下函数所示:

设置 Streaming job 的并行度

这里的 job 主要由两个参数决定:

  • Spark.scheduler.mode(FIFO/FAIR)
  • Spark.streaming.concurrentjobs

在每个 batch 内,可能有一批 Streaming job, 默认是 1,这些 job 由 jobExecutor 执行并提交,而 JobExecutor 是一个默认池子大小为 1 的线程池,大小由参数 Spark.streaming 。concurrentjobs 控制。如果 concurrentjobs 设置为 2,那么只要资源允许,那么会同时提交执行两个 job,否则仍顺序执行。

开发调优

Hbase 输出操作

在我们的项目中,需要将 Spark Streaming 计算完的结果存入到 Hbase 中,这里我们采用的是批量 Put 数据到 Hbase 中,而非每次插入单条数据,参考如下事例:

输出到 Kafka

此外我们还会将计算结果回吐到 Kafka 中。通常你可能会 Google “Spark Streaming to kafka”来寻找案例,而大多数情况你会找到下面这样的例子,当然很大程度上你也会这么写。针对 Partition 中的每条数据建立一个 Kafka Producer, 然后再发送数据,这种做法不灵活且低效。

比较高效的做法有两种:

定义 Kafka producer 为 lazy 并广播到每个 executor 上,之后就可以用这个 producer 发送数据,事例如下:

使用也比较方便:

或者使用单例模式:

遇到的坑

经过上述调优方案后,Spark Streaming 实时处理能力较之前有了质的提高,但是我们也经常会发现一些异常现象。在流量逐步升高的情况下,会出现丢包的情况,Streaming 的计算性能也受到了很大的影响。通过使用 Zabbix 工具查看网卡流量,发现有时候 eth3 网卡出口流量能达到 638Mbps, 如下图所示,而我们的网卡是千兆网,并且在存在多个 kafka Consumer 的情况下就不难解释之前的丢包现象了,同样 spark 计算过程中需要传输数据,因为受到带宽的限制也会导致计算性能的下降。

随后我们将 Kafka 集群中的网卡换到万兆,重新提交 Spark Streaming 任务后发现计算性能提升数倍:上图为调优前约 15w/s 的处理量,下图为调优后每秒 50w/s 的处理量。

VS

当在一个 Batch 时间内输入数据达到 1000W 以上事件时,Streaming 仍能很好的 handle,计算性能仍是 50W+/s 的处理速率,相比调优前基本失效的状态也大大提高了稳定性。

任务监控

对于 Spark Streaming 任务的监控可以直观的通过 Spark Web UI ,该页面包括 Input Rate, Scheduling Delay、Processing Time 等,但是这种方法运维成本较高,需要人工不间断的巡视。

另一种推荐的方式可以通过 StreamingListener 接口获取 Scheduling Delay 和 Processing Time,事例如下:

除此之外你还可以自己写 Python 脚本在 yarn 管理界面解析该应用的 ApplicationMaster 的地址,之后再通过 Spark Streaming 的 UI 去获取相关参数。

原文地址:https://www.cnblogs.com/dtmobile-ksw/p/12160560.html

时间: 2024-10-13 16:38:43

【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化的相关文章

Spark Streaming实时计算框架介绍

http://www.cnblogs.com/Leo_wl/p/3530464.html 随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐.用户行为分析等. Spark Streaming是建立在Spark上的实时计算框架,通过它提供的丰富的API.基于内存的高速执行引擎,用户可以结合流式.批处理和交互试查询应用.本文将详细介绍Spark Streaming实时计算框架的原理与特点.适用场景. Spar

【Streaming】30分钟概览Spark Streaming 实时计算

本文主要介绍四个问题: 什么是Spark Streaming实时计算? Spark实时计算原理流程是什么? Spark 2.X下一代实时计算框架Structured Streaming Spark Streaming相对其他实时计算框架该如何技术选型? 本文主要针对初学者,如果有不明白的概念可了解之前的博客内容. 1.什么是Spark Streaming? 与其他大数据框架Storm.Flink一样,Spark Streaming是基于Spark Core基础之上用于处理实时计算业务的框架.其实

Spark Streaming实时计算海量用户UV

提出需求 实时统计业务系统(web,APP之类)的访问人数,即所谓UV,或者DAU指标. 这个需求怕是流计算最最最常见的需求了. 计算UV的关键点就在于去重,即同一个人访问两次是只计一个UV的.在离线计算中统计UV比较容易想到的方法就是用group或distinct机制来去重.但是在实时计算场景,还用group就不太科学了,一个是全量数据的group是比较费时的,第二个是全量数据的group是很费内存和CPU的.特别是当用户量巨大的时候,还要做到秒级更新就更难了. 总结起来,需求就是:海量用户场

【慕课网实战】Spark Streaming实时流处理项目实战笔记八之铭文升级版

铭文一级: Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Spark Streaming个人的定义: 将不同的数据源的数据经过Spark Streaming处理之后将结果输出到外部文件系统 特点 低延时 能从错误中高效的恢复:fault-toler

Spark Streaming实时流处理项目实战

第1章 课程介绍   1-1 -导学-   1-2 -授课习惯和学习建议   1-3 -OOTB环境使用演示   1-4 -Linux环境及软件版本介绍   1-5 -Spark版本升级第2章 初识实时流处理   2-1 -课程目录   2-2 -业务现状分析   2-3 -实时流处理产生背景   2-4 -实时流处理概述   2-5 -离线计算和实时计算对比   2-6 -实时流处理框架对比   2-7 -实时流处理架构及技术选型   2-8 -实时流处理在企业中的应用第3章 分布式日志收集框

(版本定制)第5课:基于案例分析Spark Streaming流计算框架的运行源码

本期内容: 1.在线动态计算分类最热门商品案例回顾与演示 2.基于案例分析Spark Streaming的运行源码 第一部分案例: package com.dt.spark.sparkstreaming import com.robinspark.utils.ConnectionPoolimport org.apache.spark.SparkConfimport org.apache.spark.sql.Rowimport org.apache.spark.sql.hive.HiveConte

第5课:基于案例一节课贯通Spark Streaming流计算框架的运行源码

本期内容: 1 在线动态计算分类最热门商品案例回顾与演示 2 基于案例贯通Spark Streaming的运行源码 一.案例代码 在线动态计算电商中不同类别中最热门的商品排名,例如:手机类别中最热门的三种手机.电视类别中最热门的三种电视等 package com.dt.spark.sparkstreaming import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.

基于案例一节课贯通Spark Streaming流计算框架的运行源码

 在线动态计算分类最热门商品案例回顾与演示 基于案例贯通Spark Streaming的运行源码 使用Spark Streaming + Spark SQL来在线动态计算电商中不同类别中最热门的商品排名,例如手机这个类别下面最热门的三款手机. 是用mysql数据库作为元数据库,使用Hive作为存储引擎,使用Spark SQL作为查询引擎. 其中链接数据库代码如下: package com.dt.spark.com.dt.spark.streaming; import java.sql.Con

Spark 定制版:005~贯通Spark Streaming流计算框架的运行源码

本讲内容: a. 在线动态计算分类最热门商品案例回顾与演示 b. 基于案例贯通Spark Streaming的运行源码 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾 上节课主要从事务视角为大家探索Spark Streaming架构机制:Spark Streaming程序分成而部分,一部分是Driver,另外一部分是Executor.通过对Driver和Executor解析,洞察怎么才能完成完整的语义.事务一致性,并保证数据的零丢失,Exa