SparkStreaming整合flume

SparkStreaming整合flume

在实际开发中push会丢数据,因为push是由flume将数据发给程序,程序出错,丢失数据。所以不会使用不做讲解,这里讲解poll,拉去flume的数据,保证数据不丢失。

1.首先你得有flume

比如你有:【如果没有请走这篇:搭建flume集群(待定)

这里使用的flume的版本是apache1.6 cdh公司集成

这里需要下载

(1).我这里是将spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目录下

cd /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/lib

  (ps:我的flume安装目录,使用ftp工具上传上去,我使用的是finalShell支持ssh也支持ftp(需要的小伙伴,点我下载))

(2)修改flume/lib下的scala依赖包(保证版本一致)

我这里是将spark中jar安装路径的scala-library-2.11.8.jar替换掉flume下的scala-library-2.10.5.jar

删除scala-library-2.10.5.jar

rm -rf /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/lib/scala-library-2.10.5.jar 

复制scala-library-2.11.8.jar

cp /export/servers/spark-2.0.2/jars/scala-library-2.11.8.jar /export/servers/apache-flume-1.6.0-cdh5.14.0-bin/lib/

(3)编写flume-poll.conf文件

创建目录

mkdir /export/data/flume

创建配置文件

vim /export/logs/flume-poll.conf

编写配置,标注发绿光的地方需要注意更改为自己本机的(flume是基于配置执行任务)

a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/flume
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.52.110
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000 

底行模式wq保存退出

执行flume

flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /export/logs/flume-poll.conf -Dflume.root.logger=INFO,console

在监视的/export/data/flume下放入文件                    (黄色对应的是之前创建的配置文件)

执行成功

代表你flume配置没有问题,接下来开始编写代码

1.导入相关依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.0.2</version>
</dependency>

2.编码

package SparkStreaming

import SparkStreaming.DefinedFunctionAdds.updateFunc
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}

object SparkStreamingFlume {
  def main(args: Array[String]): Unit = {
    //创建sparkContext
    val conf: SparkConf = new SparkConf().setAppName("DefinedFunctionAdds").setMaster("local[2]")
    val sc = new SparkContext(conf)

    //去除多余的log,提高可视率
    sc.setLogLevel("WARN")

    //创建streamingContext
    val scc = new StreamingContext(sc,Seconds(5))

    //设置备份
    scc.checkpoint("./flume")

    //receive(task)拉取数据
    val num1: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,"192.168.52.110",8888)
    //获取flume中的body
    val value: DStream[String] = num1.map(x=>new String(x.event.getBody.array()))
    //切分处理,并附上数值1
    val result: DStream[(String, Int)] = value.flatMap(_.split(" ")).map((_,1))

    //结果累加
    val result1: DStream[(String, Int)] = result.updateStateByKey(updateFunc)

    result1.print()
    //启动并阻塞
    scc.start()
    scc.awaitTermination()
  }

  def updateFunc(currentValues:Seq[Int], historyValues:Option[Int]):Option[Int] = {
    val newValue: Int = currentValues.sum+historyValues.getOrElse(0)
    Some(newValue)
  }

}

运行

加入新的文档到监控目录  结果

成功结束!

原文地址:https://www.cnblogs.com/BigDataBugKing/p/11228784.html

时间: 2024-09-29 23:40:36

SparkStreaming整合flume的相关文章

SparkStreaming整合Flume的pull方式之启动报错解决方案

Flume配置文件: simple-agent.sources = netcat-source simple-agent.sinks = spark-sink simple-agent.channels = memory-channel #Describe/configure the source simple-agent.sources.netcat-source.type = netcat simple-agent.sources.netcat-source.bind = centos si

Spark 系列(十五)—— Spark Streaming 整合 Flume

一.简介 Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中.Spark Straming 提供了以下两种方式用于 Flume 的整合. 二.推送式方法 在推送式方法 (Flume-style Push-based Approach) 中,Spark Streaming 程序需要对某台服务器的某个端口进行监听,Flume 通过 avro Sink 将数据源源不断推送到该端口.这里以监听日志文件为例,具体整合方式如

SparkStreaming整合kafka的补充

(1)SparkStreaming 整合 kafka 两种方式对比 Direct 方式的优缺点分析 : 优点: 简化并行(Simplified Parallelism).不现需要创建以及 union 多输入源,Kafka topic 的partition 与 RDD 的 partition 一一对应. 高效(Efficiency).基于 Receiver-based 的方式保证数据零丢失(zero-data loss)需要配置 spark.streaming.receiver.writeAhea

Spark Streaming整合Flume

1 目的 Spark Streaming整合Flume.参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html) 2 整合方式一:基于推 2.1 基本要求 flume和spark一个work节点要在同一台机器上,flume会在本机器上通过配置的端口推送数据 streaming应用必须先启动,receive必须要先监听推送数据的端口后,flume才能推送数据 添加如下依赖 groupId = org.

第86课:SparkStreaming数据源Flume实际案例分享

一.什么是Flume? flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,c

第86讲:SparkStreaming数据源Flume实际案例分享

一.什么是Flume? flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用.Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera.但随着 FLume 功能的扩展,Flume OG 代码工程臃肿.核心组件设计不合理.核心配置不标准等缺点暴露出来,尤其是在 Flume OG 的最后一个发行版本 0.94.0 中,日志传输不稳定的现象尤为严重,为了解决这些问题,2011 年 10 月 22 号,c

大数据学习——SparkStreaming整合Kafka完成网站点击流实时统计

1.安装并配置zk 2.安装并配置Kafka 3.启动zk 4.启动Kafka 5.创建topic [[email protected] kafka]# bin/kafka-console-producer.sh --broker-list mini1:9092 --topic cyf-test 程序代码 package org.apache.spark import java.net.InetSocketAddress import org.apache.spark.HashPartition

Log4j整合Flume

1.环境 CDH 5.16.1 Spark 2.3.0 cloudera4 Kafka 2.1.0+kafka4.0.0 2.Log4j-->Flume 2.1 Log4j 产生日志 import org.apache.log4j.Logger; /** * @ClassName LoggerGenerator * @Author wuning * @Date: 2020/2/3 10:54 * @Description: 模拟日志输出 */ public class LoggerGenerat

scala spark-streaming整合kafka (spark 2.3 kafka 0.10)

Maven组件如下: <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version></dependency> 官网代码如下: pasting /* * Licensed to the Apache Software