应用的场景
1)DAG中Lineage过长,如果要重新计算的,则开销会很大(如在PageRank中)。
2)在Shuffle Dependency上采用Lineage的话,由于子RDD分区中的一个分区可能依赖于父RDD的中所有分区,所以需要对父RDD中的每个区进行计算,因为在子RDD中有可能其它分区也依赖于父RDD中的多个分区,这样就会造成很大的冗余计算开销。
传统方式
在RDD计算中,通过检查点机制进行容错,传统做检查点有两种方式:
1)通过冗余数据。在RDD中的doCheckPoint方法相当于通过冗余数据来缓存数据;
2)通过日志记录。Lineage就是通过相当粗粒度的记录更新操作来实现容错的。
在Spark中的应用
在Spark中,通过RDD中的checkpoint()方法来做检查点。
def checkpoint():Unit
可以通过SparkContext.setCheckPointDir()设置检查点数据的存储路劲,进而将数据存储备份,然后Spark删除所有已经做检查点的RDD的祖先RDD依赖。这个操作需要在所有需要对这个RDD所做的操作完成之后再做,因为数据写入持久化存储造成I/O开销。官方建议,做检查点的RDD最好实在内存中已经缓存的RDD,否则保存这个RDD在持久化的文件中需要重新计算,产生I/O开销。
检查点(本质是通过将RDD写入Disk做检查点)是为了通过Lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后节点出现问题而丢失分区,从做检查点的RDD开始重做lineage,就会减少开销。其实这种思想有点像开发程序时,加断点进行调试,哪块儿区域出现bug,就哪块儿区域开始处加断点,没必要从头开始找。
时间: 2024-10-10 13:32:21