【译】Yarn上常驻Spark-Streaming程序调优

作者从容错、性能等方面优化了长时间运行在yarn上的spark-Streaming作业

对于长时间运行的Spark Streaming作业,一旦提交到YARN群集便需要永久运行,直到有意停止。任何中断都会引起严重的处理延迟,并可能导致数据丢失或重复。YARN和Apache Spark都不是为了执行长时间运行的服务而设计的。但是,它们已经成功地满足了近实时数据处理作业的常驻需求。成功并不一定意味着没有技术挑战。

这篇博客总结了在安全的YARN集群上,运行一个关键任务且长时间的Spark Streaming作业的经验。您将学习如何将Spark Streaming应用程序提交到YARN群集,以避免在值班时候的不眠之夜。

Fault tolerance

在YARN集群模式下,Spark驱动程序与Application Master(应用程序分配的第一个YARN容器)在同一容器中运行。此过程负责从YARN 驱动应用程序和请求资源(Spark执行程序)。重要的是,Application Master消除了在应用程序生命周期中运行的任何其他进程的需要。即使一个提交Spark Streaming作业的边缘Hadoop节点失败,应用程序也不会受到影响。

要以集群模式运行Spark Streaming应用程序,请确保为spark-submit命令提供以下参数:

spark-submit --master yarn --deploy-mode cluster

由于Spark驱动程序和Application Master共享一个JVM,Spark驱动程序中的任何错误都会阻止我们长期运行的工作。幸运的是,可以配置重新运行应用程序的最大尝试次数。设置比默认值2更高的值是合理的(从YARN集群属性yarn.resourcemanager.am.max尝试中导出)。对我来说,4工作相当好,即使失败的原因是永久性的,较高的值也可能导致不必要的重新启动。

spark-submit --master yarn --deploy-mode cluster     --conf spark.yarn.maxAppAttempts=4

如果应用程序运行数天或数周,而不重新启动或重新部署在高度使用的群集上,则可能在几个小时内耗尽4次尝试。为了避免这种情况,尝试计数器应该在每个小时都重置。

spark-submit --master yarn --deploy-mode cluster     --conf spark.yarn.maxAppAttempts=4     --conf spark.yarn.am.attemptFailuresValidityInterval=1h

另一个重要的设置是在应用程序发生故障之前executor失败的最大数量。默认情况下是max(2 * num executors,3),非常适合批处理作业,但不适用于长时间运行的作业。该属性具有相应的有效期间,也应设置。

spark-submit --master yarn --deploy-mode cluster     --conf spark.yarn.maxAppAttempts=4     --conf spark.yarn.am.attemptFailuresValidityInterval=1h     --conf spark.yarn.max.executor.failures={8 * num_executors}     --conf spark.yarn.executor.failuresValidityInterval=1h

对于长时间运行的作业,您也可以考虑在放弃作业之前提高任务失败的最大数量。默认情况下,任务将重试4次,然后作业失败。

spark-submit --master yarn --deploy-mode cluster     --conf spark.yarn.maxAppAttempts=4     --conf spark.yarn.am.attemptFailuresValidityInterval=1h     --conf spark.yarn.max.executor.failures={8 * num_executors}     --conf spark.yarn.executor.failuresValidityInterval=1h     --conf spark.task.maxFailures=8

Performance

当Spark Streaming应用程序提交到集群时,必须定义运行作业的YARN队列。我强烈建议使用YARN Capacity Scheduler并将长时间运行的作业提交到单独的队列。没有一个单独的YARN队列,您的长时间运行的工作迟早将被的大量Hive查询抢占。

spark-submit --master yarn --deploy-mode cluster     --conf spark.yarn.maxAppAttempts=4     --conf spark.yarn.am.attemptFailuresValidityInterval=1h     --conf spark.yarn.max.executor.failures={8 * num_executors}     --conf spark.yarn.executor.failuresValidityInterval=1h     --conf spark.task.maxFailures=8     --queue realtime_queue

Spark Streaming工作的另一个重要问题是保持处理时间的稳定性和高度可预测性。处理时间应保持在批次持续时间以下以避免延误。我发现Spark的推测执行有很多帮助,特别是在繁忙的群集中。当启用推测性执行时,批处理时间更加稳定。只有当Spark操作是幂等时,才能启用推测模式。

spark-submit --master yarn --deploy-mode cluster     --conf spark.yarn.maxAppAttempts=4     --conf spark.yarn.am.attemptFailuresValidityInterval=1h     --conf spark.yarn.max.executor.failures={8 * num_executors}     --conf spark.yarn.executor.failuresValidityInterval=1h     --conf spark.task.maxFailures=8     --queue realtime_queue     --conf spark.speculation=true

Security

在安全的HDFS群集上,长时间运行的Spark Streaming作业由于Kerberos票据到期而失败。没有其他设置,当Spark Streaming作业提交到集群时,会发布Kerberos票证。当票证到期时Spark Streaming作业不能再从HDFS写入或读取数据。

在理论上(基于文档),应该将Kerberos主体和keytab作为spark-submit命令传递:

spark-submit --master yarn --deploy-mode cluster      --conf spark.yarn.maxAppAttempts=4      --conf spark.yarn.am.attemptFailuresValidityInterval=1h      --conf spark.yarn.max.executor.failures={8 * num_executors}      --conf spark.yarn.executor.failuresValidityInterval=1h      --conf spark.task.maxFailures=8      --queue realtime_queue      --conf spark.speculation=true      --principal user/[email protected]      --keytab /path/to/foo.keytab

实际上,由于几个错误(HDFS-9276SPARK-11182)必须禁用HDFS缓存。如果没有,Spark将无法从HDFS上的文件读取更新的令牌。

spark-submit --master yarn --deploy-mode cluster      --conf spark.yarn.maxAppAttempts=4      --conf spark.yarn.am.attemptFailuresValidityInterval=1h      --conf spark.yarn.max.executor.failures={8 * num_executors}      --conf spark.yarn.executor.failuresValidityInterval=1h      --conf spark.task.maxFailures=8      --queue realtime_queue      --conf spark.speculation=true      --principal user/[email protected]      --keytab /path/to/foo.keytab      --conf spark.hadoop.fs.hdfs.impl.disable.cache=true

Mark Grover指出,这些错误只影响在HA模式下配置了NameNodes的HDFS集群。谢谢,马克

Logging

访问Spark应用程序日志的最简单方法是配置Log4j控制台追加程序,等待应用程序终止并使用yarn logs -applicationId [applicationId]命令。不幸的是终止长时间运行的Spark Streaming作业来访问日志是不可行的。

我建议安装和配置Elastic,Logstash和Kibana(ELK套装)。ELK的安装和配置是超出了这篇博客的范围,但请记住记录以下上下文字段:

  • YARN application id
  • YARN container hostname
  • Executor id (Spark driver is always 000001, Spark executors start from 000002)
  • YARN attempt (to check how many times Spark driver has been restarted)

Log4j配置使用Logstash特定的appender和布局定义应该传递给spark-submit命令:

spark-submit --master yarn --deploy-mode cluster      --conf spark.yarn.maxAppAttempts=4      --conf spark.yarn.am.attemptFailuresValidityInterval=1h      --conf spark.yarn.max.executor.failures={8 * num_executors}      --conf spark.yarn.executor.failuresValidityInterval=1h      --conf spark.task.maxFailures=8      --queue realtime_queue      --conf spark.speculation=true      --principal user/[email protected]      --keytab /path/to/foo.keytab      --conf spark.hadoop.fs.hdfs.impl.disable.cache=true      --conf spark.driver.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties      --conf spark.executor.extraJavaOptions=-Dlog4j.configuration=file:log4j.properties      --files /path/to/log4j.properties

最后,Spark Job的Kibana仪表板可能如下所示:

Monitoring

长时间运行的工作全天候运行,所以了解历史指标很重要。Spark UI仅在有限数量的批次中保留统计信息,并且在重新启动后,所有度量标准都消失了。再次,需要外部工具。我建议安装Graphite用于收集指标和Grafana来建立仪表板。

首先,Spark需要配置为将指标报告给Graphite,准备metrics.properties文件:

*.sink.graphite.class=org.apache.spark.metrics.sink.GraphiteSink
*.sink.graphite.host=[hostname]
*.sink.graphite.port=[port]
*.sink.graphite.prefix=some_meaningful_name

driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

Graceful stop

最后一个难题是如何以优雅的方式停止部署在YARN上的Spark Streaming应用程序。停止(甚至杀死)YARN应用程序的标准方法是使用命令yarn application -kill [applicationId]。这个命令会停止Spark Streaming应用程序,但这可能发生在批处理中。因此,如果该作业是从Kafka读取数据然后在HDFS上保存处理结果,并最终提交Kafka偏移量,当作业在提交偏移之前停止工作时,您应该预见到HDFS会有重复的数据。

解决优雅关机问题的第一个尝试是在关闭程序时回调Spark Streaming Context的停止方法。

sys.addShutdownHook {
    streamingContext.stop(stopSparkContext = true, stopGracefully = true)
}

令人失望的是,由于Spark应用程序几乎立即被杀死,一个退出回调函数来不及完成已启动的批处理任务。此外,不能保证JVM会调用shutdown hook。

在撰写本博客文章时,唯一确认的YARN Spark Streaming应用程序的确切方法是通知应用程序关于计划关闭,然后以编程方式停止流式传输(但不是关闭挂钩)。命令yarn application -kill 如果通知应用程序在定义的超时后没有停止,则应该仅用作最后手段。

可以使用HDFS上的标记文件(最简单的方法)或使用驱动程序上公开的简单Socket / HTTP端点(复杂方式)通知应用程序。

因为我喜欢KISS原理,下面你可以找到shell脚本伪代码,用于启动/停止Spark Streaming应用程序使用标记文件:

start() {
    hdfs dfs -touchz /path/to/marker/my_job_unique_name
    spark-submit ...
}

stop() {
    hdfs dfs -rm /path/to/marker/my_job_unique_name
    force_kill=true
    application_id=$(yarn application -list | grep -oe "application_[0-9]*_[0-9]*"`)
    for i in `seq 1 10`; do
        application_status=$(yarn application -status ${application_id} | grep "State : \(RUNNING\|ACCEPTED\)")
        if [ -n "$application_status" ]; then
            sleep 60s
        else
            force_kill=false
            break
        fi
    done
    $force_kill && yarn application -kill ${application_id}
}

在Spark Streaming应用程序中,后台线程应该监视标记文件,当文件消失时停止上下文调用

streamingContext.stop(stopSparkContext = true, stopGracefully = true).

Summary

可以看到,部署在YARN上的关键任务Spark Streaming应用程序的配置相当复杂。以上提出的技术,由一些非常聪明的开发人员经过漫长而冗长乏味的迭代学习。最终,部署在高可用的YARN集群上的长期运行的Spark Streaming应用非常稳定。

原文地址:http://mkuthan.github.io/blog/2016/09/30/spark-streaming-on-yarn/

时间: 2024-07-31 10:34:51

【译】Yarn上常驻Spark-Streaming程序调优的相关文章

Spark Streaming性能调优详解(转)

原文链接:Spark Streaming性能调优详解 Spark Streaming提供了高效便捷的流式处理模式,但是在有些场景下,使用默认的配置达不到最优,甚至无法实时处理来自外部的数据,这时候我们就需要对默认的配置进行相关的修改.由于现实中场景和数据量不一样,所以我们无法设置一些通用的配置(要不然Spark Streaming开发者就不会弄那么多参数,直接写死不得了),我们需要根据数据量,场景的不同设置不一样的配置,这里只是给出建议,这些调优不一定试用于你的程序,一个好的配置是需要慢慢地尝试

7.Spark Streaming(上)--Spark Streaming原理介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处

【Spark深入学习 -14】Spark应用经验与程序调优

----本节内容------- 1.遗留问题解答 2.Spark调优初体验 2.1 利用WebUI分析程序瓶颈 2.2 设置合适的资源 2.3 调整任务的并发度 2.4 修改存储格式 3.Spark调优经验 3.1 Spark原理及调优工具 3.2 运行环境优化 3.2.1 防止不必要的分发 3.2.2 提高数据本地性 3.2.3 存储格式选择 3.2.4 选择高配机器 3.3 优化操作符 3.3.1 过滤操作导致多小任务 3.3.2 降低单条记录开销 3.3.3 处理数据倾斜或者任务倾斜 3.

Spark的性能调优

下面这些关于Spark的性能调优项,有的是来自官方的,有的是来自别的的工程师,有的则是我自己总结的. Data Serialization,默认使用的是Java Serialization,这个程序员最熟悉,但是性能.空间表现都比较差.还有一个选项是Kryo Serialization,更快,压缩率也更高,但是并非支持任意类的序列化. Memory Tuning,Java对象会占用原始数据2~5倍甚至更多的空间.最好的检测对象内存消耗的办法就是创建RDD,然后放到cache里面去,然后在UI 上

揭秘Spark应用性能调优

引言:在多台机器上分布数据以及处理数据是Spark的核心能力,即我们所说的大规模的数据集处理.为了充分利用Spark特性,应该考虑一些调优技术.本文每一小节都是关于调优技术的,并给出了如何实现调优的必要步骤.本文选自<Spark GraphX实战>. 1 用缓存和持久化来加速 Spark 我们知道Spark 可以通过 RDD 实现计算链的原理 :转换函数包含在 RDD 链中,但仅在调用 action 函数后才会触发实际的求值过程,执行分布式运算,返回运算结果.要是在 同一 RDD 上重复调用

spark 资源参数调优

资源参数调优 了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了.所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能.以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值. num-executors 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行.Driver在向YARN集群管

柯南君:教你如何对待大型网站平台的性能优化? 之 二--- 应用程序调优 (长篇总结)

柯南君:教你如何对待大型网站平台的性能优化? 之 "二"--- 应用程序调优(长篇总结) 柯南君 上一章 <柯南君:教你如何对待大型电商平台的性能优化?之 一 (方法.指标.工具.定位)>讲到了一些测试方法.测试指标.以及测试工具.稍微讲了一些如何定位的方法?这一章主要讲一下"如何优化应用程序,将其性能提升". 一.基本知识  1.下面讲一些JAVA 程序性能方面的一些看法,首先给大家讲一下应用程序调优,需要调优哪些项? ① 运算的性能 : 看哪一个算法

如何在idea里面直接运行spark streaming程序

在windows环境下,虽然控制台报了一大堆错误,但是spark streaming还是按照它的逻辑跑着,也能得到正确的结果,并且能够打断点调试!!! 由于报了一大坨的错误在控制台,导致我想看到的信息老是被刷屏出去,于是把代码放进linux的idea中去跑,发现streaming程序根本启动不起来!报如下错误: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/05/03 14:1

Apache Spark Jobs 性能调优

当你开始编写 Apache Spark 代码或者浏览公开的 API 的时候,你会遇到各种各样术语,比如transformation,action,RDD 等等. 了解到这些是编写 Spark 代码的基础. 同样,当你任务开始失败或者你需要透过web界面去了解自己的应用为何如此费时的时候,你需要去了解一些新的名词: job, stage, task.对于这些新术语的理解有助于编写良好 Spark 代码.这里的良好主要指更快的 Spark 程序.对于 Spark 底层的执行模型的了解对于写出效率更高