Spark Streaming整合Flume

1 目的

  Spark Streaming整合Flume。参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html

2 整合方式一:基于推

2.1 基本要求

  • flume和spark一个work节点要在同一台机器上,flume会在本机器上通过配置的端口推送数据
  • streaming应用必须先启动,receive必须要先监听推送数据的端口后,flume才能推送数据
  • 添加如下依赖
 groupId = org.apache.spark
 artifactId = spark-streaming-flume_2.11
 version = 2.2.0

2.2 配置Flume

  我们知道flume 的使用就是如何配置它的配置文件,使用本地的netcat source来模拟数据,本次配置如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop
a1.sources.r1.port = 5900

# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop
a1.sinks.k1.port = 5901
#a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
#a1.channels.c1.capacity = 1000
#a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2.3 在服务器上运行

思路如下:

  • 用maven打包工程
  • 使用saprk-submit提交
  • 开启flume
  • 发送模拟数据
  • 验证

验证代码如下:功能简单的做一个单词统计:

 1 package flume_streaming
 2
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.streaming.flume.FlumeUtils
 5 import org.apache.spark.streaming.{Durations, StreamingContext}
 6
 7 /**
 8  * @Author: SmallWild
 9  * @Date: 2019/11/2 9:42
10  * @Desc: 基于flumePushWordCount
11  */
12 object flumePushWordCount {
13   def main(args: Array[String]): Unit = {
14     if (args.length != 2) {
15       System.err.println("错误参数,用法:flumePushWordCount <hostname> <port>")
16       System.exit(1)
17     }
18     //传入参数
19     val Array(hostname, port) = args
20     //一定不能使用local[1]
21     val sparkConf = new SparkConf() //.setMaster("local[2]").setAppName("kafkaDirectWordCount")
22     val ssc = new StreamingContext(sparkConf, Durations.seconds(5))
23     //设置日志级别
24     ssc.sparkContext.setLogLevel("WARN")
25     //TODO 简单的进行单词统计
26     val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)
27     flumeStream.map(x => new String(x.event.getBody.array()).trim)
28       .flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
29     ssc.start()
30     ssc.awaitTermination()
31   }
32 }

验证具体步骤如下:

 1  1)打包工程
 2  mvn clean package -DskipTest
 3  2)spark-submit提交(这里使用local模式)
 4  ./spark-submit --class flume_streaming.flumePushWordCount /
 5  --master local[2] /
 6  --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 /
 7  /smallwild/app/SparkStreaming-1.0.jar hadoop 5901
 8  3)开启flume
 9  flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf -Dflume.root.logger=INFO,console
10  4)发送模式数据
11  这里使用本地5900端口发送数据
12  telnet hadoop 5900
13  5)验证
14  查看streaming应用程序是否能出现对应的单词计数字样

验证结果:能正确统计从端口发送过来的某一批次的单词的数量

3 整合方式二:基于拉(常用)

这种方式和上面基本一致

3.1 注意事项

  • 先启动flume
  • 使用自定义的sink,streaming主动去拉取数据,数据会先存放在缓冲区
  • 事务保障机制,副本机制和数据被接收(Transactions succeed only after data is received and replicated by Spark Streaming.)
  • 高容错保证
  • 添加如下依赖

     groupId = org.apache.spark
     artifactId = spark-streaming-flume-sink_2.11
     version = 2.2.0
    
     groupId = org.scala-lang
     artifactId = scala-library
     version = 2.11.8
    
     groupId = org.apache.commons
     artifactId = commons-lang3
     version = 3.5

3.2 配置Flume

和前面差别在配置sink,需要使用自定义的sink

 1 # Name the components on this agent
 2 a1.sources = r1
 3 a1.sinks = k1
 4 a1.channels = c1
 5
 6 # Describe/configure the source
 7 a1.sources.r1.type = netcat
 8 a1.sources.r1.bind = hadoop
 9 a1.sources.r1.port = 5900
10
11 # Describe the sink
12 a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
13 a1.sinks.k1.hostname = hadoop
14 a1.sinks.k1.port = 5901
15 #a1.sinks.k1.type = logger
16
17 # Use a channel which buffers events in memory
18 a1.channels.c1.type = memory
19 #a1.channels.c1.capacity = 1000
20 #a1.channels.c1.transactionCapacity = 100
21
22 # Bind the source and sink to the channel
23 a1.sources.r1.channels = c1
24 a1.sinks.k1.channel = c1

3.3 在服务上运行

  业务逻辑大致和前面一样,这里使用下面的类

 import org.apache.spark.streaming.flume._

 val flumeStream = FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sink port])

3.4 提交验证

思路如下:

  • 用maven打包工程
  • 开启flume
  • 使用saprk-submit提交
  • 发送模拟数据
  • 验证

和前面基本一致

4 总结

  整理两种整合flume的实践。

原文地址:https://www.cnblogs.com/truekai/p/11759162.html

时间: 2024-08-10 22:27:57

Spark Streaming整合Flume的相关文章

Spark 系列(十五)—— Spark Streaming 整合 Flume

一.简介 Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中.Spark Straming 提供了以下两种方式用于 Flume 的整合. 二.推送式方法 在推送式方法 (Flume-style Push-based Approach) 中,Spark Streaming 程序需要对某台服务器的某个端口进行监听,Flume 通过 avro Sink 将数据源源不断推送到该端口.这里以监听日志文件为例,具体整合方式如

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark"蘑菇云"行动之spark streaming消费flume采集的kafka数据Directf方式作业.     一.基本背景 Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式.具体的流程是这样的: 1.Direct方式是直接连接到kafka的节点上获取数据了. 2.基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offs

Spark学习七:spark streaming与flume集成

Spark学习七:spark streaming与flume集成 标签(空格分隔): Spark 一,启动flume flume-conf.properties文件 agent002.sources = sources002 agent002.channels = channels002 agent002.sinks = sinks002 ## define sources agent002.sources.sources002.type = exec agent002.sources.sour

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

Spark 系列(十六)—— Spark Streaming 整合 Kafka

一.版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下: spark-streaming-kafka-0-8 spark-streaming-kafka-0-10 Kafka 版本 0.8.2.1 or higher 0.10.0 or higher AP 状态 Deprecated从 Spark 2.3.0 版本开始,Kafka 0.8 支持已被弃用

Spark Streaming整合Kafka

0)摘要 主要介绍了Spark Streaming整合Kafka,两种整合方式:Receiver-based和Direct方式.这里使用的是Kafka broker version 0.8.2.1,官方文档地址:(http://spark.apache.org/docs/2.2.0/streaming-kafka-0-8-integration.html). 1)Kafka准备 启动zookeeper ./zkServer.sh start 启动kafka ./kafka-server-star

第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Pulling from Flume实战 二.Spark Streaming on Pulling from Flume源码解析 先简单介绍下Flume的两种模式:推模式(Flume push to Spark Streaming)和 拉模式(Spark Streaming pull from Flume ) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以连接,就将数据push过去.(简单,耦

cdh环境下,spark streaming与flume的集成问题总结

如何做集成,其实特别简单,网上其实就是教程. http://blog.csdn.net/fighting_one_piece/article/details/40667035  看这里就成. 我用的是第一种集成.. 做的时候,出现了各种问题.    大概从从2014.12.17 早晨5点搞到2014.12.17晚上18点30 总结起来其实很简单,但做的时候搞了许久啊啊啊!!!!   这样的事情,吃一堑长一智吧 问题1.  需要引用各种包,这些包要打入你的JAR中, 因为用的是spark on y

Spark Streaming和Flume集成指南V1.4.1

Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务.这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据.这里有两个方法. Python API:Flume现在还不支持PythonAPI 方法1:Flume风格的推方法 Flume被设计用来在Flume代理之间推送数据.在这种方法中,Spark Streaming本质上设置了一个接收器作为Flume的一个Avro代理,Flume把数据推送到接收器上.下面是配置的步骤. 一