spark动态资源调整其实也就是说的executor数目支持动态增减,动态增减是根据spark应用的实际负载情况来决定。
开启动态资源调整需要(on yarn情况下)
1.将spark.dynamicAllocation.enabled设置为true。意思就是启动动态资源功能
2.将spark.shuffle.service.enabled设置为true。 在每个nodeManager上设置外部shuffle服务
2.1 将spark-<version>-yarn-shuffle.jar拷贝到每台nodeManager的${HADOOP_HOME}/share/hadoop/yarn/lib/下。
2.2 配置yarn-site.xml
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>spark.shuffle.service.port</name>
<value>7337</value>
</property>
2.3 重启所有nodeManager
关于资源(executor)的Request与Remove策略
Request策略
当有被挂起的任务(pending task)的时候,也就表示当前的executor数量还不足够所有的task并行运行,这时候spark会申请增加资源,
但是并不是出现pending task就立刻请求增加executor。由下面两个参数决定,如下:
- 1.spark.dynamicAllocation.schedulerBacklogTimeout:
如果启用了动态资源分配功能,如果有pending task并且等待了一段时间(默认1秒),则增加executor
- 2.spark.dynamicAllocation.sustainedSchedulerBacklogTimeout:
随后每隔N秒(默认1秒),再检测pending task,如果仍然存在,增加executor。
此外每轮请求的executor数量是指数增长的。 比如,在第一轮中添加1个executor,然后在随后的轮中添加2、4、8,依此类推。
Remove策略
如果某executor空闲超过了一段时间,则remove此executor,由下面参数决定:
spark.dynamicAllocation.executorIdleTimeout:默认60秒
此外关于动态资源分配还有以下相关参数
- spark.dynamicAllocation.initialExecutors:
初始executor数量,如果--num-executors设置的值比这个值大,那么将使用--num-executors设置的值作为初始executor数量。
- spark.dynamicAllocation.maxExecutors:
executor数量的上限,默认是无限制的。
- spark.dynamicAllocation.minExecutors:
executor数量的下限,默认是0个
- spark.dynamicAllocation.cachedExecutorIdleTimeout:
如果executor内有缓存数据(cache data),并且空闲了N秒。则remove该executor。默认值无限制。也就是如果有缓存数据,则不会remove该executor
为什么?比如在写shuffle数据时候,executor可能会写到磁盘也可能会保存在内存中,如果保存在内存中,该executor又remove掉了,那么数据也就丢失了。
spark动态资源分配机制的应用
使用spark thriftserver将spark作为一个长期运行的服务。用户通过JDBC来提交sql查询:
$SPARK_HOME/sbin/start-thriftserver.sh --executor-memory 20g --executor-cores 5 --driver-memory 10g --driver-cores 5 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.dynamicAllocation.initialExecutors=20 --conf spark.dynamicAllocation.minExecutors=20 --conf spark.dynamicAllocation.maxExecutors=400 --conf spark.dynamicAllocation.executorIdleTimeout=300s --conf spark.dynamicAllocation.schedulerBacklogTimeout=10s \
官网关于动态资源分配的文档:
http://spark.apache.org/docs/2.3.1/job-scheduling.html#dynamic-resource-allocation
http://spark.apache.org/docs/2.3.1/configuration.html#dynamic-allocation
1
原文地址:https://www.cnblogs.com/zz-ksw/p/12228608.html