调整 Spark 应用程序的内存使用情况和 GC behavior 已经有很多的讨论在 Tuning Guide 中.我们强烈建议您阅读一下.在本节中, 我们将在 Spark Streaming applications 的上下文中讨论一些 tuning parameters (调优参数).
Spark Streaming application 所需的集群内存量在很大程度上取决于所使用的 transformations 类型.例如, 如果要在最近 10 分钟的数据中使用 window operation (窗口操作), 那么您的集群应该有足够的内存来容纳内存中 10 分钟的数据.或者如果要使用大量 keys 的 updateStateByKey
, 那么必要的内存将会很高.相反, 如果你想做一个简单的 map-filter-store 操作, 那么所需的内存就会很低.
一般来说, 由于通过 receivers (接收器)接收的数据与 StorageLevel.MEMORY_AND_DISK_SER_2 一起存储, 所以不适合内存的数据将会 spill over (溢出)到磁盘上.这可能会降低 streaming application (流式应用程序)的性能, 因此建议您提供足够的 streaming application (流量应用程序)所需的内存.最好仔细查看内存使用量并相应地进行估算.
memory tuning (内存调优)的另一个方面是 garbage collection (垃圾收集).对于需要低延迟的 streaming application , 由 JVM Garbage Collection 引起的大量暂停是不希望的.
有几个 parameters (参数)可以帮助您调整 memory usage (内存使用量)和 GC 开销:
- Persistence Level of DStreams (DStreams 的持久性级别): 如前面在 Data Serialization 部分中所述, input data 和 RDD 默认保持为 serialized bytes (序列化字节).与 deserialized persistence (反序列化持久性)相比, 这减少了内存使用量和 GC 开销.启用 Kryo serialization 进一步减少了 serialized sizes (序列化大小)和 memory usage (内存使用).可以通过 compression (压缩)来实现内存使用的进一步减少(参见Spark配置
spark.rdd.compress
), 代价是 CPU 时间. - Clearing old data (清除旧数据): 默认情况下, DStream 转换生成的所有 input data 和 persisted RDDs 将自动清除. Spark Streaming 决定何时根据所使用的 transformations (转换)来清除数据.例如, 如果您使用 10 分钟的 window operation (窗口操作), 则 Spark Streaming 将保留最近 10 分钟的数据, 并主动丢弃旧数据. 数据可以通过设置
streamingContext.remember
保持更长的持续时间(例如交互式查询旧数据). - CMS Garbage Collector (CMS垃圾收集器): 强烈建议使用 concurrent mark-and-sweep GC , 以保持 GC 相关的暂停始终如一.即使 concurrent GC 已知可以减少 系统的整体处理吞吐量, 其使用仍然建议实现更多一致的 batch processing times (批处理时间).确保在 driver (使用
--driver-java-options
在spark-submit
中 )和 executors (使用 Spark configurationspark.executor.extraJavaOptions
)中设置 CMS GC. - Other tips (其他提示): 为了进一步降低 GC 开销, 以下是一些更多的提示.
- 使用
OFF_HEAP
存储级别的保持 RDDs .在 Spark Programming Guide 中查看更多详细信息. - 使用更小的 heap sizes 的 executors.这将降低每个 JVM heap 内的 GC 压力.
- 使用
Important points to remember(要记住的要点):
- DStream 与 single receiver (单个接收器)相关联.为了获得读取并行性, 需要创建多个 receivers , 即 multiple DStreams .receiver 在一个 executor 中运行.它占据一个 core (内核).确保在 receiver slots are booked 后有足够的内核进行处理, 即
spark.cores.max
应该考虑 receiver slots . receivers 以循环方式分配给 executors . - 当从 stream source 接收到数据时, receiver 创建数据 blocks (块).每个 blockInterval 毫秒生成一个新的数据块.在 N = batchInterval/blockInterval 的 batchInterval 期间创建 N 个数据块.这些块由当前 executor 的 BlockManager 分发给其他执行程序的 block managers .之后, 在驱动程序上运行的 Network Input Tracker (网络输入跟踪器)通知有关进一步处理的块位置
- 在驱动程序中为在 batchInterval 期间创建的块创建一个 RDD .在 batchInterval 期间生成的块是 RDD 的 partitions .每个分区都是一个 spark 中的 task. blockInterval == batchinterval 意味着创建 single partition (单个分区), 并且可能在本地进行处理.
- 除非 non-local scheduling (非本地调度)进行, 否则块上的 map tasks (映射任务)将在 executors (接收 block, 复制块的另一个块)中进行处理.具有更大的 block interval (块间隔)意味着更大的块.
spark.locality.wait
的高值增加了处理 local node (本地节点)上的块的机会.需要在这两个参数之间找到平衡, 以确保在本地处理较大的块. - 而不是依赖于 batchInterval 和 blockInterval , 您可以通过调用
inputDstream.repartition(n)
来定义 number of partitions (分区数).这样可以随机重新组合 RDD 中的数据, 创建 n 个分区.是的, 为了更大的 parallelism (并行性).虽然是 shuffle 的代价. RDD 的处理由 driver’s jobscheduler 作为一项工作安排.在给定的时间点, 只有一个 job 是 active 的.因此, 如果一个作业正在执行, 则其他作业将排队. - 如果您有两个 dstream , 将会有两个 RDD 形成, 并且将创建两个将被安排在另一个之后的作业.为了避免这种情况, 你可以联合两个 dstream .这将确保为 dstream 的两个 RDD 形成一个 unionRDD .这个 unionRDD 然后被认为是一个 single job (单一的工作).但 RDD 的 partitioning (分区)不受影响.
- 如果 batch processing time (批处理时间)超过 batchinterval (批次间隔), 那么显然 receiver 的内存将会开始填满, 最终会抛出 exceptions (最可能是 BlockNotFoundException ).目前没有办法暂停 receiver .使用 SparkConf 配置
spark.streaming.receiver.maxRate
, receiver 的 rate 可以受到限制.