zeppelin中运行spark streaming kakfa & 实时可视化

notebook方式运行spark程序是一种比较agile的方式,一方面可以体验像spark-shell那样repl的便捷,同时可以借助notebook的作图能力实现快速数据可视化,非常方便快速验证和demo。notebook有两种选择,一种是ipython notebook,主要针对pyspark;另一种是zeppelin,可以执行scala spark,pyspark以及其它执行引擎,包括hive等。比较而言,ipython notebook的可视化能力更强,zeppelin的功能更强。这里主要介绍基于zeppelin的方式。

spark standalone 部署

本地搭建端到端环境可以采用spark standalone部署方案。
从spark官方网站下载压缩包spark-2.2.1-bin-hadoop2.7.tgz,解压后执行

#start cluster
./sbin/start-all.sh
# check with spark shell
spark-shell --master spark://localhost:7077
# check the web UI
http://localhost:8080

kafka 演示部署

kafka在spark streaming应用场景中使用非常广泛,它有很多优秀的特性,横向扩展、持久化、有序性、API支持三种一致性语义等。
官方网站下载kafka_2.11-0.8.2.0.tar,并解压。
这里简单启动单节点:

#start zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties
#start kafka borker
./bin/kafka-server-start.sh config/server.properties

zeppelin部署及示例

官方网站下载zeppelin-0.7.3-bin-all.tgz,解压。
为了避免端口冲突,先指定zeppelin的web端口:export ZEPPELIN_PORT=8088.
启动:

# start daemon
./bin/zeppelin-daemon.sh start
# check status
./bin/zeppelin-daemon.sh status

访问localhost:8088:

创建一个notebook并尝试运行几个快速示例:

python或者pyspark数据可视化可以使用matplotlib也可以直接将数据打印出来加上table头的注解%table {column name1}\t{column name2}\t...

spark-streaming + direct kafka

kafka0.10.0的API跟之前版本变化较大,参照http://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html,总结如下:

LocationStrategy

kafka partition跟spark executor之间对应关系
-LocationStrategies.PreferConsistent partition被均匀地对应到executor;
-PreferBrokers partition被分配给本地的executor,适合kafka跟spark集群部署在相同节点上的情况;
-PreferFixed 指定partition跟executor的映射关系

ConsumerStrategies

可以subscribe到过个topic

Offset保存

0.10之前的版本中我们需要自己在代码中保存offset,以防止spark程序异常退出,在重启自后能够从failure point开始重新处理数据。新版本的kafka consumer API自身支持了offset commit,周期地commit。示例代码中没有使用自动commit,因为从kafka中成功获取数据后就commit offset存在一些问题。数据成功被读取并不能保证数据被spark成功处理完。在之前的项目中我们的方案也是自己保存offset,例如保存在zookeeper中。

官网表示spark和kafka 0.10.0的集成目前依然是experimental状态。所以我们将基于0.8版本kafka开发。http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html

spark-streaming + kafka + zeppelin

在zeppelin中执行streaming程序并将结果创建成temporary table,进而用于实时数据可视化

准备依赖

zeppelin有类似maven的依赖解决方法,paragraph如下:

%dep

z.reset()
z.load("org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1")
//z.load("org.apache.kafka:kafka_2.11:0.8.2.0")
z.load("org.apache.kafka:kafka-clients:0.8.2.0")

单词统计代码

读取kafka数据,分词,统计单词数量,并将统计结果创建成temporary table counts

%spark
import _root_.kafka.serializer.DefaultDecoder
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._

// prevent INFO logging from pollution output
sc.setLogLevel("INFO")

// creating the StreamingContext with 5 seconds interval

val ssc = new StreamingContext(sc, Seconds(5))

val kafkaConf = Map(
    "metadata.broker.list" -> "localhost:9092",
    "zookeeper.connect" -> "localhost:2181",
    "group.id" -> "kafka-streaming-example",
    "zookeeper.connection.timeout.ms" -> "1000"
)

val lines = KafkaUtils.createStream[Array[Byte], String, DefaultDecoder, StringDecoder](
    ssc,
    kafkaConf,
    Map("test" -> 1),   // subscripe to topic and partition 1
    StorageLevel.MEMORY_ONLY
)

val words = lines.flatMap{ case(x, y) => y.split(" ")}

import spark.implicits._

val w=words.map(x=> (x,1L)).reduceByKey(_+_)
w.foreachRDD(rdd => rdd.toDF.registerTempTable("counts"))

ssc.start()

数据展示

从上面的temporary table counts 中查询每小批量的数据中top 10 的单词值。

%sql
select * from counts order by _2 desc limit 10

端到端演示

为了快速搭建端到端的数据流分析,我们可以在上述各个步骤的基础上再创建一个restful service,有很多方式,例如jetty + jersery,或者直接使用nifi连接到kafka。

时间: 2024-10-30 08:25:53

zeppelin中运行spark streaming kakfa & 实时可视化的相关文章

使用 Kafka 和 Spark Streaming 构建实时数据处理系统(转)

原文链接:http://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice2/index.html?ca=drs-&utm_source=tuicool 引言 在很多领域,如股市走向分析, 气象数据测控,网站用户行为分析等,由于数据产生快,实时性强,数据量大,所以很难统一采集并入库存储后再做处理,这便导致传统的数据处理架构不能满足需要.流计算的出现,就是为了更好地解决这类数据在处理过程中遇到的问题.与传统架构不同,流计算模型

如何在idea里面直接运行spark streaming程序

在windows环境下,虽然控制台报了一大堆错误,但是spark streaming还是按照它的逻辑跑着,也能得到正确的结果,并且能够打断点调试!!! 由于报了一大坨的错误在控制台,导致我想看到的信息老是被刷屏出去,于是把代码放进linux的idea中去跑,发现streaming程序根本启动不起来!报如下错误: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/05/03 14:1

如何在 Kubernetes 环境中运行 Spark 集群

处理这么大量的数据,背后的机器可能是数以千计,无法通过人工来监控机器的状态.因此,本文将介绍用 Kubernetes 容器管理工具,并通过简单示例,告诉你如何建立一个 Spark 集群. 准备阶段 1.需要拥有正在运行的 Kubernetes 集群,并使用 Kubectl 为其配置访问权限.如果你还没有可用的 Kubernetes 集群,则可以使用 Minikube 在本地计算机上设置测试集群 . 我们建议将 Minikube 更新为最新版本(编写本文档时为0.19.0),因为某些早期版本可能无

Windows下IntelliJ IDEA中运行Spark Standalone

ZHUAN http://www.cnblogs.com/one--way/archive/2016/08/29/5818989.html http://www.cnblogs.com/one--way/p/5814148.html 前提条件: 1.Spark Standalone 集群部署完成 2.Intellij Idea 能够运行 Spark local 模式的程序. 源码: 1 import org.apache.spark.{SparkContext, SparkConf} 2 imp

在IntelliJ中运行Spark Demo时遇到的问题及解决办法

这一阶段主要是在学习Scala,知乎上说推荐先学习一下Haskell再学习Scala,但我觉得不一定要曲线救国.不过在学习过程中遇到的困难的确不少,好歹Scala是公认的其特性的复杂程度是要超过C++的嘛:-) 我学习Scala的主要动机是想研究Spark,尽管Python和Java等都可以用来开发Spark应用,但是Spark本身就是一个Scala项目,而且Spark也不能算是一个成熟的产品,也许在我遇到问题的时候用Scala可以更加高效的解决问题 今天初步看了部分Spark的官方文档,在In

Idea中运行spark消除控制台的info日志输出的方法

这个样子看的很烦 方法: 找到自己的log4j.properties.template文件,把它放到项目的src/main/resources/下 log4j.properties.template的话,一般在spark的conf下面将第一行的log4j.rootCategory=INFO, console改成log4j.rootCategory=ERROR, console,只显示ERROR级别的日志. 拷贝出来直接放至/src/main/resources/下 不想下的话,这是我的 链接:h

【转】Spark Streaming 实时计算在甜橙金融监控系统中的应用及优化

系统架构介绍 整个实时监控系统的架构是先由 Flume 收集服务器产生的日志 Log 和前端埋点数据, 然后实时把这些信息发送到 Kafka 分布式发布订阅消息系统,接着由 Spark Streaming 消费 Kafka 中的消息,同时消费记录由 Zookeeper 集群统一管理,这样即使 Kafka 宕机重启后也能找到上次的消费记录继而进行消费.在这里 Spark Streaming 首先从 MySQL 读取规则然后进行 ETL 清洗并计算多个聚合指标,最后将结果的一部分存储到 Hbase

Spark Streaming实时计算框架介绍

http://www.cnblogs.com/Leo_wl/p/3530464.html 随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐.用户行为分析等. Spark Streaming是建立在Spark上的实时计算框架,通过它提供的丰富的API.基于内存的高速执行引擎,用户可以结合流式.批处理和交互试查询应用.本文将详细介绍Spark Streaming实时计算框架的原理与特点.适用场景. Spar

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP