Spark Streaming和Flume-NG对接实验(好文转发)

 转发自玖疯的博客

 http://www.cnblogs.com/lxf20061900/p/3866252.html

Spark Streaming是一个新的实时计算的利器,而且还在快速的发展。它将输入流切分成一个个的DStream转换为RDD,从而可以使用Spark来处理。它直接支持多种数据源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些可以操作的函数:mapreducejoinwindow等。

  本文将Spark Streaming和Flume-NG进行对接,然后以官方内置的JavaFlumeEventCount作参考,稍作修改然后放到集群上去运行。

  一、下载spark streaming的flume插件包,我们这里的spark版本是1.0.0(standlone),这个插件包的版本选择spark-streaming-flume_2.10-1.0.1.jar,这个版本修复了一个重要的bug,参考下面参考中的7。

  二、把spark的编译后的jar包以及上面flume的插件,放入工程,编写如下类(参考8中的例子修改而来),代码如下:

 1 package com.spark_streaming;
 2
 3 import org.apache.spark.SparkConf;
 4 import org.apache.spark.api.java.function.Function;
 5 import org.apache.spark.streaming.*;
 6 import org.apache.spark.streaming.api.java.*;
 7 import org.apache.spark.streaming.flume.FlumeUtils;
 8 import org.apache.spark.streaming.flume.SparkFlumeEvent;
 9
10 public final class JavaFlumeEventCount {
11   private JavaFlumeEventCount() {
12   }
13
14   public static void main(String[] args) {
15
16     String host = args[0];
17     int port = Integer.parseInt(args[1]);
18
19     Duration batchInterval = new Duration(Integer.parseInt(args[2]));
20     SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
21     JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
22     JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
23
24     flumeStream.count();
25
26     flumeStream.count().map(new Function<Long, String>() {
27       @Override
28       public String call(Long in) {
29         return "Received " + in + " flume events.";
30       }
31     }).print();
32
33     ssc.start();
34     ssc.awaitTermination();
35   }
36 }

  这个和官方的区别是删除了参数个数检查和增加了自定义时间间隔(分割流),也就是第三个参数。这个类并没有做太多处理,入门为主。

  三、打包这个类到ifeng_spark.jar,连同spark-streaming-flume_2.10-1.0.1.jar一起上传到spark集群中的节点上。

  四、启动flume,这个flume的sink要用avro,指定要发送到的spark集群中的一个节点,我们这里是10.32.21.165:11000。

  五、在spark安装根目录下执行如下命令:

  ./bin/spark-submit  --master spark://10.32.21.165:8070  --driver-memory 4G  --executor-memory 4G --jars /usr/lib/spark-1.0.0-cdh4/lib/spark-streaming-flume_2.10-1.0.1.jar,/usr/lib/flume-ng-1.4-cdh4.6.0/lib/flume-ng-sdk-1.4.0-cdh6.0.jar  /usr/lib/spark-1.0.0-cdh4/ifeng_spark.jar   --class com.spark_streaming.JavaFlumeEventCount 10.32.21.165 11000 2000

  这个命令中的参数解释请参考下面参考3中的解释,也可以自己增加一些参数,需要注意的是配置内存,自己根据需要自行增加内存(driver、executor)防止OOM。另外jars可以同时加载多个jar包,逗号分隔。记得指定类后需要指定3个参数。

  如果没有指定Flume的sdk包,会爆如下错误:

  java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;没有找到类。这个类在flume的sdk包内,在jars参数中指定jar包位置就可以。

  还有就是要将自己定义的业务类的jar单独列出,不要放在jars参数指定,否则也会有错误抛出。

  运行后可以看到大量的输出信息,然后可以看到有数据的RDD会统计出这个RDD有多少行,截图如下,最后的部分就是这2秒(上面命令最后的参数设定的)统计结果:

 至此,flume-ng与spark的对接成功,这只是一个入门实验。可根据需要灵活编写相关的业务类来实现实时处理Flume传输的数据。

  spark streaming和一些数据传输工具对接可以达到实时处理的目的。

  参考:

  1、https://spark.apache.org/docs/0.9.0/streaming-programming-guide.html

  2、http://www.cnblogs.com/cenyuhai/p/3577204.html

  3、http://blog.csdn.net/book_mmicky/article/details/25714545 , 重要的参数解释

  4、http://blog.csdn.net/lskyne/article/details/37561235 , 这是一个例子

  5、http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.spark%22%20 , spark-flume插件下载

  6、http://outofmemory.cn/spark/configuration , spark一些可配置参数说明

  7、https://issues.apache.org/jira/browse/SPARK-1916  ,这是1.0.1之前版本中spark streaming与flume对接的一个bug信息

  8、https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples/streaming , 这是java版本的spark streaming的一些例子,里面有flume的一个

时间: 2024-10-18 15:30:13

Spark Streaming和Flume-NG对接实验(好文转发)的相关文章

Spark Streaming整合Flume

1 目的 Spark Streaming整合Flume.参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html) 2 整合方式一:基于推 2.1 基本要求 flume和spark一个work节点要在同一台机器上,flume会在本机器上通过配置的端口推送数据 streaming应用必须先启动,receive必须要先监听推送数据的端口后,flume才能推送数据 添加如下依赖 groupId = org.

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark"蘑菇云"行动之spark streaming消费flume采集的kafka数据Directf方式作业.     一.基本背景 Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式.具体的流程是这样的: 1.Direct方式是直接连接到kafka的节点上获取数据了. 2.基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offs

Spark学习七:spark streaming与flume集成

Spark学习七:spark streaming与flume集成 标签(空格分隔): Spark 一,启动flume flume-conf.properties文件 agent002.sources = sources002 agent002.channels = channels002 agent002.sinks = sinks002 ## define sources agent002.sources.sources002.type = exec agent002.sources.sour

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

Spark 系列(十五)—— Spark Streaming 整合 Flume

一.简介 Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中.Spark Straming 提供了以下两种方式用于 Flume 的整合. 二.推送式方法 在推送式方法 (Flume-style Push-based Approach) 中,Spark Streaming 程序需要对某台服务器的某个端口进行监听,Flume 通过 avro Sink 将数据源源不断推送到该端口.这里以监听日志文件为例,具体整合方式如

第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Pulling from Flume实战 二.Spark Streaming on Pulling from Flume源码解析 先简单介绍下Flume的两种模式:推模式(Flume push to Spark Streaming)和 拉模式(Spark Streaming pull from Flume ) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以连接,就将数据push过去.(简单,耦

cdh环境下,spark streaming与flume的集成问题总结

如何做集成,其实特别简单,网上其实就是教程. http://blog.csdn.net/fighting_one_piece/article/details/40667035  看这里就成. 我用的是第一种集成.. 做的时候,出现了各种问题.    大概从从2014.12.17 早晨5点搞到2014.12.17晚上18点30 总结起来其实很简单,但做的时候搞了许久啊啊啊!!!!   这样的事情,吃一堑长一智吧 问题1.  需要引用各种包,这些包要打入你的JAR中, 因为用的是spark on y

Spark Streaming和Flume集成指南V1.4.1

Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务.这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据.这里有两个方法. Python API:Flume现在还不支持PythonAPI 方法1:Flume风格的推方法 Flume被设计用来在Flume代理之间推送数据.在这种方法中,Spark Streaming本质上设置了一个接收器作为Flume的一个Avro代理,Flume把数据推送到接收器上.下面是配置的步骤. 一

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming

第88讲:Spark Streaming从Flume Poll数据

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming