SparkStreaming整合kafka的补充

(1)SparkStreaming 整合 kafka 两种方式对比

Direct 方式的优缺点分析

  • 优点:

    • 简化并行(Simplified Parallelism)。不现需要创建以及 union 多输入源,Kafka topic 的partition 与 RDD 的 partition 一一对应。
    • 高效(Efficiency)。基于 Receiver-based 的方式保证数据零丢失(zero-data loss)需要配置 spark.streaming.receiver.writeAheadLog.enable=true,此种方式需要保存两份数据,浪费存储空间也影响效率。而 Direct 方式则不存在这个问题。
    • 强一致语义(Exactly-once semantics)。High-level 数据由 Spark Streaming 消费,但是Offsets 则是由 Zookeeper 保存。通过参数配置,可以实现 at-least once 消费,此种情况有重复消费数据的可能。
    • 降低资源。Direct 不需要 Receivers,其申请的 Executors 全部参与到计算任务中;而Receiver-based 则需要专门的 Receivers 来读取 Kafka 数据且不参与计算。因此相同的资源申请,Direct 能够支持更大的业务。
    • 降低内存。Receiver-based 的 Receiver 与其他 Exectuor 是异步的,并持续不断接收数据,对于小业务量的场景还好,如果遇到大业务量时,需要提高 Receiver 的内存,但是参与计算的 Executor 并无需那么多的内存。而 Direct 因为没有 Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
  • 缺点:
    • 提高成本。Direct 需要用户采用 checkpoint 或者第三方存储来维护 offsets,而不像Receiver-based 那样,通过 ZooKeeper 来维护 Offsets,此提高了用户的开发成本。
    • 监控可视化。Receiver-based 方式指定 topic 指定 consumer 的消费情况均能通过ZooKeeper 来监控,而 Direct 则没有这种便利,不能自动保存 offset 到 zookeeper,如果做到监控并可视化,则需要投入人力开发。
      Receiver 方式的优缺点分析
  • 优点:
    • 专注计算。Kafka 的 high-level 数据读取方式让用户可以专注于所读数据,而不用关注或维护 consumer 的 offsets,这减少用户的工作量以及代码量而且相对比较简单。
  • 缺点:
    • 防数据丢失。做 checkpoint 操作以及配置 spark.streaming.receiver.writeAheadLog.enable参数,配置 spark.streaming.receiver.writeAheadLog.enable 参数,每次处理之前需要将该batch 内的日志备份到 checkpoint 目录中,这降低了数据处理效率,反过来又加重了Receiver 端的压力;另外由于数据备份机制,会受到负载影响,负载一高就会出现延迟的风险,导致应用崩溃。
    • 单 Receiver 内存。由于 receiver 也是属于 Executor 的一部分,那么为了提高吞吐量
    • 重复消费。在程序失败恢复时,有可能出现数据部分落地,但是程序失败,未更新 offset的情况,这导致数据重复消费。
    • Receiver 和计算的 Executor的异步的,那么遇到网络等因素原因,导致计算出现延迟,计算队列一直在增加,而Receiver 则在一直接收数据,这非常容易导致程序崩溃。

      (2)对kafka消费的offset的管理

  • spark自带的checkpoint:
    • 启用spark streaming的checkpoint是存储偏移量的最简单方法
    • 流式checkpoint专门保存用户应用程序的状态
    • 但是checkpoint的目录是不能共享的,无法跨越应用程序进行恢复
    • 一般不使用checkpoint管理offset
  • 使用zookeeper管理offset
    • 如果Zookeeper中未保存offset,根据kafkaParam的配置使用最新或者最旧的offset
    • 如果 zookeeper中有保存offset,我们会利用这个offset作为kafkaStream 的起始位置
  • 使用hbase保存offset
    • Rowkey的设计:topic名称 + groupid + streaming的batchtime.milliSeconds
  • 使用hdfs管理offset:当然这种情况不推荐使用,因为在hdfs中会生成大量的小文件,导致,hdfs的性能急剧下降

    (3)Driver的HA

      介绍:他能够在driver失败的时候,通过读取checkpoint目录下的元数据,恢复当前streamingContext对象的状态;它能够察觉到driver进程异常退出之后,自动重启。
      具体流程:当第一次运行程序时,发现checkpoint中没有数据,则根据定义的函数来第一次创建StreamingContext对象,当程序异常退出的时候,此时会根据checkpoint中的元数据恢复一个StreamingContext对象,达到异常退出之前的状态,而实现异常退出并自动启动则是sparkStreaming应用程序对driver进行监控,并且在他失败的时候感知,并进行重启。
      必要条件
      - spark-submit提交作业的时候,必须是集群模式(cluster),并且必须在spark-standalong下。

    spark-submit --class com.aura.mazh.spark.streaming.kafka.SparkStreamDemo_Direct //这里只能使用spark的standalong模式,所以配置为spark集群
    --master spark://hadoop02:7077,hadoop04:7077 --driver-memory 512m --total-executor-cores 3 --executor-memory 512m #这句代码一定要加,他可以使异常退出的driver程序,重新启动
    --supervise \
    --name SparkStreamDemo_Direct --jars /home/hadoop/lib/kafka_2.11-0.8.2.1.jar,/home/hadoop/lib/metrics-core-2.2.0.jar,/home/hadoop/lib/spark-streaming_2.11-2.3.2.jar,/home/hadoop/lib/spark-streaming-kafka-0-8_2.11-2.3.2.jar,/home/hadoop/lib/zkclient-0.3.jar /home/hadoop/original-spark-1.0-SNAPSHOT.jar spark://hadoop02:7077,hadoop04:7077

      - 需要添加--supervise \,才能实现失败自启动
      - 需要配置checkpoint目录,并且是存储在hdfs上,jar也要放置在hdfs上

原文地址:http://blog.51cto.com/14048416/2339933

时间: 2024-11-05 14:52:17

SparkStreaming整合kafka的补充的相关文章

scala spark-streaming整合kafka (spark 2.3 kafka 0.10)

Maven组件如下: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version></dependency> 官网代码如下: pasting /* * Licensed to the Apache Software

大数据学习——SparkStreaming整合Kafka完成网站点击流实时统计

1.安装并配置zk 2.安装并配置Kafka 3.启动zk 4.启动Kafka 5.创建topic [[email protected] kafka]# bin/kafka-console-producer.sh --broker-list mini1:9092 --topic cyf-test 程序代码 package org.apache.spark import java.net.InetSocketAddress import org.apache.spark.HashPartition

Kafuka面试(整合Kafka两种模式区别)

整合Kafka两种模式说明 ★面试题:Receiver & Direct 开发中我们经常会利用SparkStreaming实时地读取kafka中的数据然后进行处理,在spark1.3版本后,kafkaUtils里面提供了两种创建DStream的方法: 1.Receiver接收方式: KafkaUtils.createDstream(开发中不用,了解即可,但是面试可能会问) Receiver作为常驻的Task运行在Executor等待数据,但是一个Receiver效率低,需要开启多个,再手动合并数

SparkStreaming与Kafka整合遇到的问题及解决方案

前言 最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志. 实现 Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式. 一. 基于Receiver方式

SparkStreaming消费kafka数据

概要:本例子为SparkStreaming消费kafka消息的例子,实现的功能是将数据实时的进行抽取.过滤.转换,然后存储到HDFS中. 实例代码 package com.fwmagic.test import com.alibaba.fastjson.{JSON, JSONException} import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf imp

Spark Streaming整合Kafka

0)摘要 主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式.这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html). 1)Kafka准备 启动zookeeper ./zkServer.sh start 启动kafka ./kafka-server-star

sparkStreaming 读kafka的数据

目标:sparkStreaming每2s中读取一次kafka中的数据,进行单词计数. topic:topic1 broker list:192.168.1.126:9092,192.168.1.127:9092,192.168.1.128:9092 1.首先往一个topic中实时生产数据. 代码如下: 代码功能:每秒向topic1发送一条消息,一条消息里包含4个单词,单词之间用空格隔开. 1 package kafkaProducer 2 3 import java.util.HashMap 4

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

flume 整合 kafka

flume 整合 kafka: flume 采集业务日志,发送到kafka 安装部署Kafka Download 1.0.0 is the latest release. The current stable version is 1.0.0. You can verify your download by following these procedures and using these KEYS. 1.0.0 Released November 1, 2017 Source downloa