spark1.3.0集成kafka的新办法

  • Simplified Parallelism: No need to create multiple input Kafka streams and union-ing them. With directStream, Spark Streaming will create as many RDD partitions as there is Kafka partitions to consume, which will all read data from Kafka in parallel. So there is one-to-one mapping between Kafka and RDD partitions, which is easier to understand and tune.
  • 简化并行:不再需要创建多个kafka的输入流,然后再将其合并。用directStream,spark Streaming讲根据kafka要消费的分区来创建对应的RDD,所有的数据都是并行的葱kafka中读取。简单来讲就是RDD和kafka的分区做了一一对应。
  • Efficiency: Achieving zero-data loss in the first approach required the data to be stored in a Write Ahead Log, which further replicated the data. This is actually inefficient as the data effectively gets replicated twice - once by Kafka, and a second time by the Write Ahead Log. This second approach eliminate the problem as there is no receiver, and hence no need for Write Ahead Logs.
  • 高效:实现零数据丢失,在spark1.2中,需要讲数据存储在一个预写日志(Write Ahead Log),实际上是数据做了二次复制 - 一次通过kafka复制到spark,第二被预写到hdfs存储中,效率是很低下的。spark1.3提供的方法消除了该问题,因为没有接收器,因此没有必要预写日志。
  • Exactly-once semantics: The first approach uses Kafka’s high level API to store consumed offsets in Zookeeper. This is traditionally the way to consume data from Kafka. While this approach (in combination with write ahead logs) can ensure zero data loss (i.e. at-least once semantics), there is a small chance some records may get consumed twice under some failures. This occurs because of inconsistencies between data reliably received by Spark Streaming and offsets tracked by Zookeeper. Hence, in this second approach, we use simple Kafka API that does not use Zookeeper and offsets tracked only by Spark Streaming within its checkpoints. This eliminates inconsistencies between Spark Streaming and Zookeeper/Kafka, and so each record is received by Spark Streaming effectively exactly once despite failures.
  • 消费一次原则(语义):在spark1.2之前的版本中,spark steaming通过kafka的高级消费API来读取数据,并且将偏移量保存在zookeeper中。 这是传统上从kafka消费数据的方式。虽然这种方法(结合Write Ahead Log日志)可以保证零数据丢失(即至少一次语义),但还是在发生故障的情况下有很少的几率会消费两次。这是由于zookeeper保存偏移量和spark steaming接收数据之间的不一致造成的。因此,在spark1.3中,我们使用简单的Kafak API,而不是使用zookeeper(高级api),偏移量由spark streaming通过checkpoints来保存,这消除了spark streaming 和zookeeper/kafka之间的不一致,所以即使发生失败,收到的每条记录由spark streaming 有且只能消费一次。

Note that one disadvantage of this approach is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself .

需要注意的是这个方法的唯一缺点就是zookeeper不再保存kafka的偏移量,因此原来依赖zookeeper的监控工具将不能正确的显示消费进程。然而你可以手动的讲spark streaming的偏移量更新到zookeeper中。

  /**
   * :: Experimental ::
   * Create an input stream that directly pulls messages from Kafka Brokers
   * without using any receiver. This stream can guarantee that each message
   * from Kafka is included in transformations exactly once (see points below).
   *
   * Points to note:
   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
   *    You can access the offsets used in each batch from the generated RDDs (see
   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
   *    in the [[StreamingContext]]. The information on consumed offset can be
   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
   *  - End-to-end semantics: This stream ensures that every records is effectively received and
   *    transformed exactly once, but gives no guarantees on whether the transformed data are
   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
   *    that the output operation is idempotent, or use transactions to output records atomically.
   *    See the programming guide for more details.
   *
   * @param ssc StreamingContext object
   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
   *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
   *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in
   *    host1:port1,host2:port2 form.
   * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)
   *    starting point of the stream
   * @param messageHandler Function for translating each message and metadata into the desired type
   */
@Experimental
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag,
R: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
  ): InputDStream[R] = {
new DirectKafkaInputDStream[K, V, KD, VD, R](
      ssc, kafkaParams, fromOffsets, messageHandler)
  }
  /**
   * :: Experimental ::
   * Create an input stream that directly pulls messages from Kafka Brokers
   * without using any receiver. This stream can guarantee that each message
   * from Kafka is included in transformations exactly once (see points below).
   *
   * Points to note:
   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
   *    You can access the offsets used in each batch from the generated RDDs (see
   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
   *    in the [[StreamingContext]]. The information on consumed offset can be
   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
   *  - End-to-end semantics: This stream ensures that every records is effectively received and
   *    transformed exactly once, but gives no guarantees on whether the transformed data are
   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
   *    that the output operation is idempotent, or use transactions to output records atomically.
   *    See the programming guide for more details.
   *
   * @param ssc StreamingContext object
   * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
   *   configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"
   *   to be set with Kafka broker(s) (NOT zookeeper servers), specified in
   *   host1:port1,host2:port2 form.
   *   If not starting from a checkpoint, "auto.offset.reset" may be set to "largest" or "smallest"
   *   to determine where the stream starts (defaults to "largest")
   * @param topics Names of the topics to consume
   */
@Experimental
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
  ): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
val kc = new KafkaCluster(kafkaParams)
val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
    (for {
      topicPartitions <- kc.getPartitions(topics).right
      leaderOffsets <- (if (reset == Some("smallest")) {
        kc.getEarliestLeaderOffsets(topicPartitions)
      } else {
        kc.getLatestLeaderOffsets(topicPartitions)
      }).right
    } yield {
val fromOffsets = leaderOffsets.map { case (tp, lo) =>
          (tp, lo.offset)
      }
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
        ssc, kafkaParams, fromOffsets, messageHandler)
    }).fold(
      errs => throw new SparkException(errs.mkString("\n")),
      ok => ok
    )
  }
时间: 2025-01-05 11:12:51

spark1.3.0集成kafka的新办法的相关文章

Storm集成Kafka应用的开发

我们知道storm的作用主要是进行流式计算,对于源源不断的均匀数据流流入处理是非常有效的,而现实生活中大部分场景并不是均匀的数据流,而是时而多时而少的数据流入,这种情况下显然用批量处理是不合适的,如果使用storm做实时计算的话可能因为数据拥堵而导致服务器挂掉,应对这种情况,使用kafka作为消息队列是非常合适的选择,kafka可以将不均匀的数据转换成均匀的消息流,从而和storm比较完善的结合,这样才可以实现稳定的流式计算,那么我们接下来开发一个简单的案例来实现storm和kafka的结合 s

spark streaming集成kafka

Kakfa起初是由LinkedIn公司开发的一个分布式的消息系统,后成为Apache的一部分,它使用Scala编写,以可水平扩展和高吞吐率而被广泛使用.目前越来越多的开源分布式处理系统如Cloudera.Apache Storm.Spark等都支持与Kafka集成. Spark streaming集成kafka是企业应用中最为常见的一种场景. 一.安装kafka 参考文档: http://kafka.apache.org/quickstart#quickstart_createtopic 1.安

SpringMVC + spring3.1.1 + hibernate4.1.0 集成及常见问题总结

一 开发环境 1.动态web工程 2.部分依赖 java代码: hibernate-release-4.1.0.Final.zip hibernate-validator-4.2.0.Final.jar spring-framework-3.1.1.RELEASE-with-docs.zip proxool-0.9.1.jar log4j 1.2.16 slf4j -1.6.1 mysql-connector-java-5.1.10.jar hamcrest 1.3.0RC2 ehcache 2

spark-1.2.0编译资料

说句实话,这次安装spark真是个非常费劲的过程,spark好像确实都需要自己编译,用自己编译好的版本安转,对于一个菜鸟初学者,抱着求职的欲望,却被天朝的墙挡住,就是一个悲剧,这里我转载了一篇别人写好的,编译过程的帖子,我也是按照这个过程做的编译. 但我提供一下了链接供你下载使用,希望能给你提供一定的帮助! 1.spark-1.2.0的源码包 2.scala-2.10.4的安装包 3.我用来编译的对应的hadoop-2.4.1的安装包和hadoop的2.2.0的hadoop包 4.我编译的过程中

Spark-1.6.0中的Sort Based Shuffle源码解读

从Spark-1.2.0开始,Spark的Shuffle由Hash Based Shuffle升级成了Sort Based Shuffle.即Spark.shuffle.manager从Hash换成了Sort.不同形式是Shuffle逻辑主要是ShuffleManager的实现类不同. 在org.apache.spark.SparkEnv类中: // Let the user specify short names for shuffle managers val shortShuffleMgr

storm集成kafka的应用,从kafka读取,写入kafka

storm集成kafka的应用,从kafka读取,写入kafka by 小闪电 0前言 storm的主要作用是进行流式的实时计算,对于一直产生的数据流处理是非常迅速的,然而大部分数据并不是均匀的数据流,而是时而多时而少.对于这种情况下进行批处理是不合适的,因此引入了kafka作为消息队列,与storm完美配合,这样可以实现稳定的流式计算.下面是一个简单的示例实现从kafka读取数据,并写入到kafka,以此来掌握storm与kafka之间的交互. 1程序框图 实质上就是storm的kafkasp

spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)

application-test.properties 1 #kafka 2 kafka.consumer.zookeeper.connect=*:2181 3 kafka.consumer.servers=*:9092 4 kafka.consumer.enable.auto.commit=true 5 kafka.consumer.session.timeout=6000 6 kafka.consumer.auto.commit.interval=1000 7 #保证每个组一个消费者消费同一

SpringBoot2.0集成FastDFS

SpringBoot2.0集成FastDFS 前两篇整体上介绍了通过 Nginx 和 FastDFS 的整合来实现文件服务器.但是,在实际开发中对图片或文件的操作都是通过应用程序来完成的,因此,本篇将介绍 Spring Boot 整合 FastDFS 客户端来实现对图片/文件服务器的访问. 如果有不了解 FastDFS 的读者可以先浏览<CentOS7 安装FastDFS分布式文件系统>或是另行查阅网上相关资料. 一.整合编码 项目整体的代码结构图如下: 添加依赖 <project xm

Spark-1.4.0单机部署(Hadoop-2.6.0采用伪分布式)【已测】

??目前手上只有一个机器,就先拿来练下手(事先服务器上没有安装软件)尝试一下Spark的单机部署. ??几个参数: ??JDK-1.7+ ??Hadoop-2.6.0(伪分布式): ??Scala-2.10.5: ??Spark-1.4.0: ??下面是具体的配置过程 安装JDK 1.7+ [下载网址]http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html 环境变量设置(最好不要采用o