Spark整合kafka0.10.0新特性(二)

接着Spark整合kafka0.10.0新特性(一)开始

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  "auto.offset.reset" -> "latest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)

val topics = Array("topicA", "topicB")
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](topics, kafkaParams)
)

stream.map(record => (record.key, record.value))

分析完位置策略和消费策略,接下来先看看org.apache.spark.streaming.kafka010.KafkaUtils$#createDirectStream的具体实现:

  @Experimental
  def createDirectStream[K, V](
      ssc: StreamingContext,
      locationStrategy: LocationStrategy,
      consumerStrategy: ConsumerStrategy[K, V]): InputDStream[ConsumerRecord[K, V]] = {
    val ppc = new DefaultPerPartitionConfig(ssc.sparkContext.getConf)
    createDirectStream[K, V](ssc, locationStrategy, consumerStrategy, ppc)
  }

返回的是InputDStream[ConsumerRecord[K,V]]类型,查看一下ConsumerRecord类型:

/**
 * A key/value pair to be received from Kafka. This consists of a topic name and a partition number, from which the
 * record is being received and an offset that points to the record in a Kafka partition.
 * 从Kafka接受到的消息对key/value,包含topic名字、分区编号、以及消息在分区的offset
 */
public final class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = Record.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;

    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final long checksum;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final K key;
    private final V value;

	等等省略
}

关于InputDStream具体细节略,看一下类继承结构:

所以createDirectStream返回的具体类型是DirectKafkaInputDStream。

接着在createDirectStream中创建DefaultPerPartitionConfig,DefaultPerPartitionConfig就是一个设置每一个分区获取消息的组大数率,设置参数为spark.streaming.kafka.maxRatePerPartition.源码如下:

package org.apache.spark.streaming.kafka010

import org.apache.kafka.common.TopicPartition

import org.apache.spark.SparkConf
import org.apache.spark.annotation.Experimental

/**
 * :: Experimental ::
 * Interface for user-supplied configurations that can‘t otherwise be set via Spark properties,
 * because they need tweaking on a per-partition basis,
 *
 * 为用户提供的一个配置接口,但是这些参数不可以使用spark配置文件进行配置,因为spark配置文件配置,因为他们需要
 * 对每一个分区的比率进行调整。可以使用SparkConf进行设置数率
 */
@Experimental
abstract class PerPartitionConfig extends Serializable {
  /**
   *  Maximum rate (number of records per second) at which data will be read
   *  from each Kafka partition.
   *
   *从Kafka分区中读取数据的最大比率(每秒最大记录数)
   */
  def maxRatePerPartition(topicPartition: TopicPartition): Long
}

/**
 * Default per-partition configuration
 */
private class DefaultPerPartitionConfig(conf: SparkConf)
    extends PerPartitionConfig {
  val maxRate = conf.getLong("spark.streaming.kafka.maxRatePerPartition", 0)
  //从Kafka分区中读取数据的最大比率(每秒最大记录数)
  def maxRatePerPartition(topicPartition: TopicPartition): Long = maxRate
}

创建完毕PerPartitionConfig之后再次调用createDirectStream的重载方法:

  def createDirectStream[K, V](
      ssc: StreamingContext,
      locationStrategy: LocationStrategy,
      consumerStrategy: ConsumerStrategy[K, V],
      perPartitionConfig: PerPartitionConfig
    ): InputDStream[ConsumerRecord[K, V]] = {
    new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)
  }

接下来重点查看DirectKafkaInputDStream的构造器(注意:Scala类的构造器是从类定义的左{开始到右}结束都是主构造器):

package org.apache.spark.streaming.kafka010

import java.{util => ju}
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable

import org.apache.kafka.clients.consumer._
import org.apache.kafka.common.{PartitionInfo, TopicPartition}

import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
import org.apache.spark.streaming.scheduler.rate.RateEstimator

/**
  *
  * each given Kafka topic/partition corresponds to an RDD partition.
  * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
  * of messages
  * per second that each ‘‘‘partition‘‘‘ will accept.
  *
  * 每个topic的每一个分区对应一个RDD分区
  * spark的spark.streaming.kafka.maxRatePerPartition参数配置指定了每秒每一个topic的每一个分区获取的最大消息数
  *
  * @param locationStrategy    In most cases, pass in [[PreferConsistent]],
  *                            see [[LocationStrategy]] for more details.
  * @param executorKafkaParams Kafka
  *                            <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
  *                            configuration parameters</a>.
  *                            Requires  "bootstrap.servers" to be set with Kafka broker(s),
  *                            NOT zookeeper servers, specified in host1:port1,host2:port2 form.
  * @param consumerStrategy    In most cases, pass in [[Subscribe]],
  *                            see [[ConsumerStrategy]] for more details
  * @tparam K type of Kafka message key   Kafka消息的Key
  * @tparam V type of Kafka message value  Kafka消息的Value
  */
private[spark] class DirectKafkaInputDStream[K, V](
                                                    _ssc: StreamingContext,
                                                    locationStrategy: LocationStrategy,
                                                    consumerStrategy: ConsumerStrategy[K, V],
                                                    ppc: PerPartitionConfig
                                                  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {

  val executorKafkaParams = {
    val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams)
    //根据具体的executor调整参数,防止在executor上出问题
    KafkaUtils.fixKafkaParams(ekp)
    ekp
  }

  //存入当前偏移的
  protected var currentOffsets = Map[TopicPartition, Long]()

  //如果偏移量为1的话,则设置偏移为1
  @transient private var kc: Consumer[K, V] = null

  def consumer(): Consumer[K, V] = this.synchronized {
    if (null == kc) {
      kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava)
    }
    kc
  }

  override def persist(newLevel: StorageLevel): DStream[ConsumerRecord[K, V]] = {
    logError("Kafka ConsumerRecord is not serializable. " +
      "Use .map to extract fields before calling .persist or .window")
    super.persist(newLevel)
  }

  //
  protected def getBrokers = {
    val c = consumer
    val result = new ju.HashMap[TopicPartition, String]()
    val hosts = new ju.HashMap[TopicPartition, String]()
    //assignment()获取该Consumer的TopicPartition,返回Set<TopicPartition>集合
    val assignments = c.assignment().iterator()
    //两层while循环实现获取
    while (assignments.hasNext()) {
      val tp: TopicPartition = assignments.next()

      //当前的TopicPartition的主机地址没有的话,需要根据去kafka集群查找该TopicPartition的主机地址
      if (null == hosts.get(tp)) {
        //partitionsFor获取给定topic和partition的元数据,如果本地没有会发起rpc
        val infos = c.partitionsFor(tp.topic).iterator()
        while (infos.hasNext()) {
          val i = infos.next()
          //TopicPartition重写了equals方法
          hosts.put(new TopicPartition(i.topic(), i.partition()), i.leader.host())
        }
      }
      //TopicPartition重写了equals方法,所以可以hosts.get(tp)
      //到此处就获取到了分区和分区的地址
      result.put(tp, hosts.get(tp))
    }
    result
  }

  protected def getPreferredHosts: ju.Map[TopicPartition, String] = {
    locationStrategy match {
      case PreferBrokers => getBrokers
      case PreferConsistent => ju.Collections.emptyMap[TopicPartition, String]()
      case PreferFixed(hostMap) => hostMap
    }
  }

  // Keep this consistent with how other streams are named (e.g. "Flume polling stream [2]")
  private[streaming] override def name: String = s"Kafka 0.10 direct stream [$id]"

  protected[streaming] override val checkpointData =
    new DirectKafkaInputDStreamCheckpointData

  /**
    * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
    */
  override protected[streaming] val rateController: Option[RateController] = {
    if (RateController.isBackPressureEnabled(ssc.conf)) {
      Some(new DirectKafkaRateController(id,
        RateEstimator.create(ssc.conf, context.graph.batchDuration)))
    } else {
      None
    }
  }

  protected[streaming] def maxMessagesPerPartition(
                                                    offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
    val estimatedRateLimit = rateController.map(_.getLatestRate())

    // calculate a per-partition rate limit based on current lag
    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
      case Some(rate) =>
        val lagPerPartition = offsets.map { case (tp, offset) =>
          tp -> Math.max(offset - currentOffsets(tp), 0)
        }
        val totalLag = lagPerPartition.values.sum

        lagPerPartition.map { case (tp, lag) =>
          val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
          tp -> (if (maxRateLimitPerPartition > 0) {
            Math.min(backpressureRate, maxRateLimitPerPartition)
          } else backpressureRate)
        }
      case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp) }
    }

    if (effectiveRateLimitPerPartition.values.sum > 0) {
      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
      Some(effectiveRateLimitPerPartition.map {
        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
      })
    } else {
      None
    }
  }

  /**
    * The concern here is that poll might consume messages despite being paused,
    * which would throw off consumer position.  Fix position if this happens.
    */
  private def paranoidPoll(c: Consumer[K, V]): Unit = {
    val msgs = c.poll(0)
    if (!msgs.isEmpty) {
      // position should be minimum offset per topicpartition
      msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
        val tp = new TopicPartition(m.topic, m.partition)
        val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)
        acc + (tp -> off)
      }.foreach { case (tp, off) =>
        logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate")
        c.seek(tp, off)
      }
    }
  }

  /**
    * Returns the latest (highest) available offsets, taking new partitions into account.
    */
  protected def latestOffsets(): Map[TopicPartition, Long] = {
    val c = consumer
    paranoidPoll(c)
    val parts = c.assignment().asScala

    // make sure new partitions are reflected in currentOffsets
    val newPartitions = parts.diff(currentOffsets.keySet)
    // position for new partitions determined by auto.offset.reset if no commit
    currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap
    // don‘t want to consume messages, so pause
    c.pause(newPartitions.asJava)
    // find latest available offsets
    c.seekToEnd(currentOffsets.keySet.asJava)
    parts.map(tp => tp -> c.position(tp)).toMap
  }

  // limits the maximum number of messages per partition
  protected def clamp(
                       offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {

    maxMessagesPerPartition(offsets).map { mmp =>
      mmp.map { case (tp, messages) =>
        val uo = offsets(tp)
        tp -> Math.min(currentOffsets(tp) + messages, uo)
      }
    }.getOrElse(offsets)
  }

  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    val untilOffsets = clamp(latestOffsets())
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      val fo = currentOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }
    val rdd = new KafkaRDD[K, V](
      context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val description = offsetRanges.filter { offsetRange =>
      // Don‘t display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets
    commitAll()
    Some(rdd)
  }

  override def start(): Unit = {
    val c = consumer
    paranoidPoll(c)
    if (currentOffsets.isEmpty) {
      currentOffsets = c.assignment().asScala.map { tp =>
        tp -> c.position(tp)
      }.toMap
    }

    // don‘t actually want to consume any messages, so pause all partitions
    c.pause(currentOffsets.keySet.asJava)
  }

  override def stop(): Unit = this.synchronized {
    if (kc != null) {
      kc.close()
    }
  }

  protected val commitQueue = new ConcurrentLinkedQueue[OffsetRange]
  protected val commitCallback = new AtomicReference[OffsetCommitCallback]

  /**
    * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
    *
    * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
    */
  def commitAsync(offsetRanges: Array[OffsetRange]): Unit = {
    commitAsync(offsetRanges, null)
  }

  /**
    * Queue up offset ranges for commit to Kafka at a future time.  Threadsafe.
    *
    * @param offsetRanges The maximum untilOffset for a given partition will be used at commit.
    * @param callback     Only the most recently provided callback will be used at commit.
    */
  def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = {
    commitCallback.set(callback)
    commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*))
  }

  protected def commitAll(): Unit = {
    val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]()
    var osr = commitQueue.poll()
    while (null != osr) {
      val tp = osr.topicPartition
      val x = m.get(tp)
      val offset = if (null == x) {
        osr.untilOffset
      } else {
        Math.max(x.offset, osr.untilOffset)
      }
      m.put(tp, new OffsetAndMetadata(offset))
      osr = commitQueue.poll()
    }
    if (!m.isEmpty) {
      consumer.commitAsync(m, commitCallback.get)
    }
  }

  private[streaming]
  class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
    def batchForTime: mutable.HashMap[Time, Array[(String, Int, Long, Long)]] = {
      data.asInstanceOf[mutable.HashMap[Time, Array[OffsetRange.OffsetRangeTuple]]]
    }

    override def update(time: Time): Unit = {
      batchForTime.clear()
      generatedRDDs.foreach { kv =>
        val a = kv._2.asInstanceOf[KafkaRDD[K, V]].offsetRanges.map(_.toTuple).toArray
        batchForTime += kv._1 -> a
      }
    }

    override def cleanup(time: Time): Unit = {}

    override def restore(): Unit = {
      batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
        logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
        generatedRDDs += t -> new KafkaRDD[K, V](
          context.sparkContext,
          executorKafkaParams,
          b.map(OffsetRange(_)),
          getPreferredHosts,
          // during restore, it‘s possible same partition will be consumed from multiple
          // threads, so dont use cache
          false
        )
      }
    }
  }

  /**
    * A RateController to retrieve the rate from RateEstimator.
    */
  private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
    extends RateController(id, estimator) {
    override def publish(rate: Long): Unit = ()
  }

}


完成DirectKafkaInputDStream的创建,到此处都是transformation过程,此时还没有遇到action算子开始执行。


其中DirectKafkaInputDStream的compute方法就是有StreamingContext的start方法间接最终调用的。

  override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
    val untilOffsets = clamp(latestOffsets())
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      val fo = currentOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }
    val rdd = new KafkaRDD[K, V](
      context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, true)//重点方法,位置策略

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val description = offsetRanges.filter { offsetRange =>
      // Don‘t display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets
    commitAll()
    Some(rdd)
  }
关于位置策略的看的还不是很透彻,改天再研究一下:org.apache.spark.streaming.kafka010.DirectKafkaInputDStream#getPreferredHosts方法。




补充:Streaming作业的产生和执行都是由StreamingContext的start方法开始的。
时间: 2025-01-02 06:50:18

Spark整合kafka0.10.0新特性(二)的相关文章

C# 6.0 新特性 (二)

自动属性初始化表达式 有过正确实现结构经验的所有 .NET 开发人员无疑都为一个问题所困扰:需要使用多少语法才能使类型固定不变(为 .NET 标准建议的类型).此问题实际上是只读属性存在的问题: 定义为只读的支持字段 构造函数内支持字段的初始化 属性的显式实现(而非使用自动属性) 返回支持字段的显式 getter 实现 所有这一切仅仅是为了“正确地”实现固定不变的属性.之后,此情况还会针对类型的所有属性重复发生.因此,正确操作需要比不堪一击的方法付出明显更多的努力.发布了自动属性初始化表达式(C

背水一战 Windows 10 (43) - C# 7.0 新特性

原文:背水一战 Windows 10 (43) - C# 7.0 新特性 [源码下载] 作者:webabcd 介绍背水一战 Windows 10 之 C# 7.0 新特性 介绍 C# 7.0 的新特性 示例1.C# 7.0 示例 1: out 变量, 数字语法改进, 值类型的异步返回CSharp7/Demo1.xaml.cs /* * C# 7 示例 1 * out 变量, 数字语法改进, 值类型的异步返回 */ using System; using System.Threading.Task

背水一战 Windows 10 (1) - C# 6.0 新特性

原文:背水一战 Windows 10 (1) - C# 6.0 新特性 [源码下载] 作者:webabcd 介绍背水一战 Windows 10 之 C# 6.0 新特性 介绍 C# 6.0 的新特性 示例1.C# 6.0 示例 1: 自动属性支持初始化, 字符串嵌入的新方式, 通过 Using Static 引用静态类, nameof 表达式CSharp6/Demo1.xaml.cs /* * C# 6 示例 1 * 自动属性支持初始化, 字符串嵌入的新方式, 通过 Using Static 引

Servlet 3.0 新特性详解

转自:https://www.ibm.com/developerworks/cn/java/j-lo-servlet30/ Servlet 3.0 新特性详解 张 建平2010 年 4 月 23 日发布 WeiboGoogle+用电子邮件发送本页面 6 Servlet 3.0 新特性概述 Servlet 3.0 作为 Java EE 6 规范体系中一员,随着 Java EE 6 规范一起发布.该版本在前一版本(Servlet 2.5)的基础上提供了若干新特性用于简化 Web 应用的开发和部署.其

[转]Servlet 3.0 新特性详解

原文地址:http://blog.csdn.net/xiazdong/article/details/7208316 Servlet 3.0 新特性概览 1.Servlet.Filter.Listener无需在web.xml中进行配置,可以通过Annotation进行配置: 2.模块化编程,即将各个Servlet模块化,将配置文件也分开配置. 3.Servlet异步处理,应对复杂业务处理: 4.异步Listener,对于异步处理的创建.完成等进行监听: 5. 文件上传API简化: tomcat

Servlet 3.0 新特性详解 (转载)

原文地址:https://www.ibm.com/developerworks/cn/java/j-lo-servlet30/ Servlet 3.0 新特性概述 Servlet 3.0 作为 Java EE 6 规范体系中一员,随着 Java EE 6 规范一起发布.该版本在前一版本(Servlet 2.5)的基础上提供了若干新特性用于简化 Web 应用的开发和部署.其中有几项特性的引入让开发者感到非常兴奋,同时也获得了 Java 社区的一片赞誉之声: 异步处理支持:有了该特性,Servlet

C#发展历程以及C#6.0新特性

一.C#发展历程 下图是自己整理列出了C#每次重要更新的时间及增加的新特性,对于了解C#这些年的发展历程,对C#的认识更加全面,是有帮助的. 二.C#6.0新特性 1.字符串插值 (String Interpolation) 字符串拼接优化 Before: var Name = "joye.net"; var Results = "Hello" + Name;//直接拼接 var results1 = string.Format("Hello {0}&qu

Java 9和Java 10的新特性

http://www.infoq.com/cn/news/2014/09/java9 Java 9新特性汇总 继2014年3月份Java 8发布之后,Open JDK加快了开发速度, Java 9的发布已经提上日程.预计在2016年发布Java 9,同时公布了JEP(JDK改进提议)中的前期列表.任职于Takipi 的Alex Zhitnitsky整理了Java 9中一些纳入JSR(Java规范提案)的新特性和大家一直期待但未确定的一些特性.这些特性有Jigsaw项目.新的智能编译工具.期待已久

Atitit.&#160;C#.net&#160;clr&#160;2.0&#160;&#160;4.0新特性

Atitit. C#.net clr 2.0  4.0新特性 1. CLR内部结构1 2. CLR 版本发展史3 3. CLR 2.0 3 4. CLR 4 新特性 概览4 4.1.1.  托管与本地代码的互操作5 4.1.2.    垃圾回收6 4.1.3.    代码约定6 4.1.4.    Corrupted state exception6 4.1.5.     新的安全模型7 4.1.6.     同一个进程,多个CLR7 4.1.7.     基本类库7 5. CLR最新发展8 6