Spark Streaming 结合FlumeNG使用实例

SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)进行类似map、reduce、join、window等复杂操作,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。

Spark Streaming流式处理系统特点有:

  • 将流式计算分解成一系列短小的批处理作业
  • 将失败或者执行较慢的任务在其它节点上并行执行
  • 较强的容错能力(基于RDD继承关系Lineage)
  • 使用和RDD一样的语义

本文将Spark Streaming结合FlumeNG,然后以源码中的JavaFlumeEventCount作参考,建立maven工程,打包在spark standalone集群运行。

一、步骤

1.建立maven工程,写好pom.xml

需要spark streaming的flume插件包,jar的maven地址如下,填入pom.xml中

1 <dependency>
2     <groupId>org.apache.spark</groupId>
3     <artifactId>spark-streaming-flume_2.10</artifactId>
4     <version>1.1.0</version>
5 </dependency>

完整的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>test</groupId>
    <artifactId>hq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <build>
    <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                    <compilerVersion>1.6</compilerVersion>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>.</classpathPrefix>
                            <mainClass>JavaFlumeEventCount</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.4</version>
                <configuration>
                  <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                  </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.10</artifactId>
            <version>1.1.0</version>
        </dependency>
    </dependencies>
</project>

2.编码并且打包

JavaCode:

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.flume.FlumeUtils;
import org.apache.spark.streaming.flume.SparkFlumeEvent;

public final class JavaFlumeEventCount {
    private JavaFlumeEventCount() {
    }

    public static void main(String[] args) {

        String host = args[0];
        int port = Integer.parseInt(args[1]);

        Duration batchInterval = new Duration(Integer.parseInt(args[2]));
        SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
                batchInterval);
        JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils
                .createStream(ssc, host, port);

        flumeStream.count();

        flumeStream.count().map(new Function<Long, String>() {
            private static final long serialVersionUID = -572435064083746235L;

            public String call(Long in) {
                return "Received " + in + " flume events.";
            }
        }).print();

        ssc.start();
        ssc.awaitTermination();
    }
}

maven 命令:eclipse中run as -> Maven Assembly:assembly

得到工程的target目录下得到jar包:hq-0.0.1-SNAPSHOT.jar

3.将3个jar包上传到服务器,准备运行

除了自身打的jar包外,运行还需要:spark-streaming-flume_2.10-1.1.0.jar,flume-ng-sdk-1.4.0.jar 这2个jar包(我使用的flume-ng版本是1.4.0)

将3个jar包上传到服务器~/spark/test/目录下。

4.命令行提交任务,运行

[[email protected] test]$ spark-submit --master spark://eb174:7077 --name FlumeStreaming --class JavaFlumeEventCount --executor-memory 1G --total-executor-cores 2 --jars spark-streaming-flume_2.10-1.1.0.jar,flume-ng-sdk-1.4.0.jar hq.jar eb174 11000 5000

注意:参数解释:spark-submit --help。自己可以根据需要修改内存,防止OOM。另外jars可以同时加载多个jar包,逗号分隔。指定的运行类后需要指定3个参数。

5.开启flume-ng,启动数据源

书写好flume的agent配置文件spark-flumeng.conf,内容如下:

 1 #Agent5
 2 #List the sources, sinks and channels for the agent
 3 agent5.sources =  source1
 4 agent5.sinks =  hdfs01
 5 agent5.channels = channel1
 6
 7 #set channel for sources and sinks
 8 agent5.sources.source1.channels = channel1
 9 agent5.sinks.hdfs01.channel = channel1
10
11 #properties of someone source
12 agent5.sources.source1.type = spooldir
13 agent5.sources.source1.spoolDir = /home/hadoop/huangq/spark-flumeng-data/
14 agent5.sources.source1.ignorePattern = .*(\\.index|\\.tmp|\\.xml)$
15 agent5.sources.source1.fileSuffix = .1
16 agent5.sources.source1.fileHeader = true
17 agent5.sources.source1.fileHeaderKey = filename
18
19 # set interceptors
20 agent5.sources.source1.interceptors = i1 i2
21 agent5.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
22 agent5.sources.source1.interceptors.i1.preserveExisting = false
23 agent5.sources.source1.interceptors.i1.hostHeader = hostname
24 agent5.sources.source1.interceptors.i1.useIP=false
25 agent5.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
26
27 #properties of mem-channel-1
28 agent5.channels.channel1.type = memory
29 agent5.channels.channel1.capacity = 100000
30 agent5.channels.channel1.transactionCapacity = 100000
31 agent5.channels.channel1.keep-alive = 30
32
33 #properties of sink
34 agent5.sinks.hdfs01.type = avro
35 agent5.sinks.hdfs01.hostname = eb174
36 agent5.sinks.hdfs01.port = 11000

启动flume-ng: [[email protected] flume]$ bin/flume-ng agent -n agent5 -c conf  -f conf/spark-flumeng.conf

注意:

①flume的sink要用avro,指定要发送到的spark集群中的一个节点,我们这里是eb174:11000。

②如果没有指定Flume的sdk包,会出现错误: java.lang.NoClassDefFoundError: Lorg/apache/flume/source/avro/AvroFlumeEvent;没有找到类。这个类在flume的sdk包内,在jars参数中指定jar包位置就可以。

③将自己定义的运行jar包单独列出,不要放在jars参数指定,否则也会有错误抛出。

6.运行结果

在提交spark任务的客户端可以看到,看到大量的输出信息,然后可以看到有数据的RDD会统计出这个RDD有多少行,统计结果如下:

 1 Spark assembly has been built with Hive, including Datanucleus jars on classpath
 2 Using Spark‘s default log4j profile: org/apache/spark/log4j-defaults.properties
 3 14/10/13 19:00:44 INFO SecurityManager: Changing view acls to: ebupt,
 4 14/10/13 19:00:44 INFO SecurityManager: Changing modify acls to: ebupt,
 5 14/10/13 19:00:44 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ebupt, ); users with modify permissions: Set(ebupt, )
 6 14/10/13 19:00:45 INFO Slf4jLogger: Slf4jLogger started
 7 14/10/13 19:00:45 INFO Remoting: Starting remoting
 8 14/10/13 19:00:45 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:51147]
 9 14/10/13 19:00:45 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:51147]
10 14/10/13 19:00:45 INFO Utils: Successfully started service ‘sparkDriver‘ on port 51147.
11 14/10/13 19:00:45 INFO SparkEnv: Registering MapOutputTracker
12 14/10/13 19:00:45 INFO SparkEnv: Registering BlockManagerMaster
13 ....
14 .....
15 14/10/13 19:09:21 INFO DAGScheduler: Missing parents: List()
16 14/10/13 19:09:21 INFO DAGScheduler: Submitting Stage 145 (MappedRDD[291] at map at MappedDStream.scala:35), which has no missing parents
17 14/10/13 19:09:21 INFO MemoryStore: ensureFreeSpace(3400) called with curMem=13047, maxMem=278302556
18 14/10/13 19:09:21 INFO MemoryStore: Block broadcast_110 stored as values in memory (estimated size 3.3 KB, free 265.4 MB)
19 14/10/13 19:09:21 INFO MemoryStore: ensureFreeSpace(2020) called with curMem=16447, maxMem=278302556
20 14/10/13 19:09:21 INFO MemoryStore: Block broadcast_110_piece0 stored as bytes in memory (estimated size 2020.0 B, free 265.4 MB)
21 14/10/13 19:09:21 INFO BlockManagerInfo: Added broadcast_110_piece0 in memory on eb174:41187 (size: 2020.0 B, free: 265.4 MB)
22 14/10/13 19:09:21 INFO BlockManagerMaster: Updated info of block broadcast_110_piece0
23 14/10/13 19:09:21 INFO DAGScheduler: Submitting 1 missing tasks from Stage 145 (MappedRDD[291] at map at MappedDStream.scala:35)
24 14/10/13 19:09:21 INFO TaskSchedulerImpl: Adding task set 145.0 with 1 tasks
25 14/10/13 19:09:21 INFO TaskSetManager: Starting task 0.0 in stage 145.0 (TID 190, eb175, PROCESS_LOCAL, 1132 bytes)
26 14/10/13 19:09:21 INFO BlockManagerInfo: Added broadcast_110_piece0 in memory on eb175:57696 (size: 2020.0 B, free: 519.6 MB)
27 14/10/13 19:09:21 INFO TaskSetManager: Finished task 0.0 in stage 145.0 (TID 190) in 25 ms on eb175 (1/1)
28 14/10/13 19:09:21 INFO DAGScheduler: Stage 145 (take at DStream.scala:608) finished in 0.026 s
29 14/10/13 19:09:21 INFO TaskSchedulerImpl: Removed TaskSet 145.0, whose tasks have all completed, from pool
30 14/10/13 19:09:21 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.036589357 s
31 -------------------------------------------
32 Time: 1413198560000 ms
33 -------------------------------------------
34 Received 35300 flume events.
35
36 14/10/13 19:09:55 INFO JobScheduler: Finished job streaming job 1413198595000 ms.0 from job set of time 1413198595000 ms
37 14/10/13 19:09:55 INFO JobScheduler: Total delay: 0.126 s for time 1413198595000 ms (execution: 0.112 s)
38 14/10/13 19:09:55 INFO MappedRDD: Removing RDD 339 from persistence list
39 14/10/13 19:09:55 INFO BlockManager: Removing RDD 339
40 14/10/13 19:09:55 INFO MappedRDD: Removing RDD 338 from persistence list
41 14/10/13 19:09:55 INFO BlockManager: Removing RDD 338
42 14/10/13 19:09:55 INFO MappedRDD: Removing RDD 337 from persistence list
43 14/10/13 19:09:55 INFO BlockManager: Removing RDD 337
44 14/10/13 19:09:55 INFO ShuffledRDD: Removing RDD 336 from persistence list
45 14/10/13 19:09:55 INFO BlockManager: Removing RDD 336
46 14/10/13 19:09:55 INFO UnionRDD: Removing RDD 335 from persistence list
47 14/10/13 19:09:55 INFO BlockManager: Removing RDD 335
48 14/10/13 19:09:55 INFO MappedRDD: Removing RDD 333 from persistence list
49 14/10/13 19:09:55 INFO BlockManager: Removing RDD 333
50 14/10/13 19:09:55 INFO BlockRDD: Removing RDD 332 from persistence list
51 14/10/13 19:09:55 INFO BlockManager: Removing RDD 332
52 ...
53 ...
54 14/10/13 19:10:00 INFO TaskSchedulerImpl: Adding task set 177.0 with 1 tasks
55 14/10/13 19:10:00 INFO TaskSetManager: Starting task 0.0 in stage 177.0 (TID 215, eb175, PROCESS_LOCAL, 1132 bytes)
56 14/10/13 19:10:00 INFO BlockManagerInfo: Added broadcast_134_piece0 in memory on eb175:57696 (size: 2021.0 B, free: 530.2 MB)
57 14/10/13 19:10:00 INFO TaskSetManager: Finished task 0.0 in stage 177.0 (TID 215) in 24 ms on eb175 (1/1)
58 14/10/13 19:10:00 INFO DAGScheduler: Stage 177 (take at DStream.scala:608) finished in 0.024 s
59 14/10/13 19:10:00 INFO TaskSchedulerImpl: Removed TaskSet 177.0, whose tasks have all completed, from pool
60 14/10/13 19:10:00 INFO SparkContext: Job finished: take at DStream.scala:608, took 0.033844743 s
61 -------------------------------------------
62 Time: 1413198600000 ms
63 -------------------------------------------
64 Received 0 flume events.

二、结论

  • flume-ng与spark的结合成功,可根据需要灵活编写相关的类来实现实时处理FlumeNG传输的数据。
  • spark streaming和多种数据源结合,达到实时计算处理的能力。

三、参考资料

  1. Spark Streaming和Flume-NG对接实验
  2. Spark和Flume-ng整合
  3. Flume sink 配置手册
时间: 2024-12-20 20:19:12

Spark Streaming 结合FlumeNG使用实例的相关文章

Spark Streaming和Flume-NG对接实验(好文转发)

转发自玖疯的博客 http://www.cnblogs.com/lxf20061900/p/3866252.html Spark Streaming是一个新的实时计算的利器,而且还在快速的发展.它将输入流切分成一个个的DStream转换为RDD,从而可以使用Spark来处理.它直接支持多种数据源:Kafka, Flume, Twitter, ZeroMQ , TCP sockets等,有一些可以操作的函数:map, reduce, join, window等. 本文将Spark Streamin

第82课:Spark Streaming第一课:案例动手实战并在电光石火间理解其工作原理

本期内容: 1.Spark Streaming 动手实战演示 2.闪电般理解Spark Streaming原理 案例动手实战并在电光石火间理解其工作原理 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.交通监控等:通信时代的手机.

第82课 Spark Streaming第一课 案例动手实战并在电光石火间理解其工作原理

本课内容提要: (1)什么是流处理以及Spark Streaming主要介绍 (2)Spark Streaming初体验 一.什么是流处理以及Spark Streaming主要介绍 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.

第82讲:Spark Streaming第一讲:案例动手实战并在电光石火间理解其工作原理

本期内容: 1.Spark Streaming 动手实战演示 2.闪电般理解Spark Streaming原理 3.案例动手实战并在电光石火间理解其工作原理 流(Streaming),在大数据时代为数据流处理,就像水流一样,是数据流:既然是数据流处理,就会想到数据的流入.数据的加工.数据的流出. 日常工作.生活中数据来源很多不同的地方.例如:工业时代的汽车制造.监控设备.工业设备会产生很多源数据:信息时代的电商网站.日志服务器.社交网络.金融交易系统.黑客攻击.垃圾邮件.交通监控等:通信时代的手

spark streaming 实例

spark streaming 开发实例 本文将分以下几部分 spark 开发环境配置 创建spark项目 编写streaming代码示例 调试 环境配置: spark 原生语言是scala, 我用的是spark-1.4.1-bin-hadoop2.6,可以查阅官方说明,用的是scala-2.10.1. 网上下载 scala-2.10.1 安装包.解压即可. 配置环境变量:SCALA_HOME path 增加 %SCALA_HOME%\bin 创建项目: 我使用的Ide 是Intellj ide

Spark源码系列(八)Spark Streaming实例分析

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照<Spark Streaming编程指南>. Example代码分析 val ssc = new StreamingContext(sparkConf, Seconds(1)); // 获得一个DStream负责连接 监听端口:地址 val lines = ssc.socketTextStream(serverIP, serverPort); // 对每一行数据执行Split操作 val words = line

Spark Streaming中的基本操作函数实例

官网文档中,大概可分为这几个 TransformationsWindow OperationsJoin OperationsOutput Operations 请了解一些基本信息: DStream是Spark Streaming提供的基本抽象.它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流.在内部,DStream由一系列连续的RDD表示,这是Spark对不可变分布式数据集的抽象.DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示 Tra

【自动化】基于Spark streaming的SQL服务实时自动化运维

设计背景 spark thriftserver目前线上有10个实例,以往通过监控端口存活的方式很不准确,当出故障时进程不退出情况很多,而手动去查看日志再重启处理服务这个过程很低效,故设计利用Spark streaming去实时获取spark thriftserver的log,通过log判断服务是否停止服务,从而进行对应的自动重启处理,该方案能达到秒级 7 * 24h不间断监控及维护服务. 设计架构 在需要检测的spark thriftserver服务节点上部署flume agent来监控日志流

4. Spark Streaming解析

4.1 初始化StreamingContext import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) // 可以通过 ssc.sparkContext 来访问 SparkContext // 或者通过已