spark与flume整合

spark-streaming与flume整合  push

package cn.my.sparkStream

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._

/**

  */
object SparkFlumePush {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumeEventCount <host> <port>")
      System.exit(1)
    }
    LogLevel.setStreamingLogLevels()
    val Array(host, port) = args
    val batchInterval = Milliseconds(2000)
    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, batchInterval)
    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2)
    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events.").print()
    //拿到消息中的event,从event中拿出body,body是真正的消息体
    stream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print

    ssc.start()
    ssc.awaitTermination()
  }
}

package cn.my.sparkStream

import java.net.InetSocketAddress

import org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming._import org.apache.spark.streaming.flume._

/**  *  */object SparkFlumePull {  def main(args: Array[String]) {    if (args.length < 2) {      System.err.println(        "Usage: FlumeEventCount <host> <port>")      System.exit(1)    }    LogLevel.setStreamingLogLevels()    val Array(host, port) = args    val batchInterval = Milliseconds(2000)    // Create the context and set the batch size    val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]")    val ssc = new StreamingContext(sparkConf, batchInterval)    // Create a flume stream

//val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2)    // val flumeStream = FlumeUtils.createPollingStream(ssc, host, port.toInt)    /*      def createPollingStream(      jssc: JavaStreamingContext,      addresses: Array[InetSocketAddress],      storageLevel: StorageLevel    ):     */    //当sink有多个的时候    val flumesinklist = Array[InetSocketAddress](new InetSocketAddress("mini1", 8888))    val flumeStream = FlumeUtils.createPollingStream(ssc, flumesinklist, StorageLevel.MEMORY_ONLY_2)

flumeStream.count().map(cnt => "Received " + cnt + " flume events.").print()    flumeStream.flatMap(t => {      new String(t.event.getBody.array()).split(" ")    }).map((_, 1)).reduceByKey(_ + _).print()

// Print out the count of events received from this server in each batch    //stream.count().map(cnt => "Received " + cnt + " flume events.").print()    //拿到消息中的event,从event中拿出body,body是真正的消息体    //stream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print

ssc.start()    ssc.awaitTermination()  }}
 

http://spark.apache.org/docs/1.6.3/streaming-flume-integration.html

时间: 2024-08-02 09:52:37

spark与flume整合的相关文章

2016年大数据Spark“蘑菇云”行动之flume整合spark streaming

近期,听了王家林老师的2016年大数据Spark"蘑菇云"行动,需要将flume,kafka和Spark streaming进行整合. 感觉一时难以上手,还是先从简单着手吧:我的思路是这样的,flume产生数据,然后输出到spark streaming,flume的源数据是netcat(地址:localhost,端口22222),输出是avro(地址:localhost,端口是11111).Spark streaming的处理是直接输出有几个events. 一.配置文件 Flume 配

找不到org.apache.spark.streaming.flume.sink.SparkFlumeProtocol$Callback

java.lang.NoClassDefFoundError: org/apache/spark/streaming/flume/sink/SparkFlumeProtocol$Callback at org.apache.spark.streaming.flume.FlumePollingReceiver$$anonfun$onStart$1.apply(FlumePollingInputDStream.scala:84) at org.apache.spark.streaming.flume

&lt;Spark Streaming&gt;&lt;Flume&gt;&lt;Integration&gt;

Overview Flume:一个分布式的,可靠的,可用的服务,用于有效地收集.聚合.移动大规模日志数据 我们搭建一个flume + Spark Streaming的平台来从Flume获取数据,并处理它. 有两种方法实现:使用flume-style的push-based方法,或者使用自定义的sink来实现pull-based方法. Approach 1: Flume-style Push-based Approach flume被设计用来在Flume agents之间推信息,在这种方式下,Spa

flume 整合 kafka

flume 整合 kafka: flume 采集业务日志,发送到kafka 安装部署Kafka Download 1.0.0 is the latest release. The current stable version is 1.0.0. You can verify your download by following these procedures and using these KEYS. 1.0.0 Released November 1, 2017 Source downloa

Flume整合Kafka完成实时数据采集

agent选择 agent1 exec source + memory channel + avro sink agent2 avro source + memory channel 模拟实际工作中的场景,agent1 为A机器,agent2 为B机器. avro source: 监听avro端口,并且接收来自外部avro信息, avro sink:一般用于跨节点传输,主要绑定数据移动目的地的ip和port 在创建agent2配置文件 cd /app/flume/flume/conf vi te

中国移动实时数据分析-基于spark+kafka+flume

这两天主要是做了中国移动的实时数据分析一个小项目(可以说是demo了),这里记录下来整个过程里面遇到的坑,首先安装好flume,kafka,spark(基于代码本地运行可以不安装),redis,zookeeper 主要是为了熟悉一下整个的一个spark-streaming的一个整个流程,还有就是了解调优的地方. 上述假设已经安装好了相应的组件,然后就开始正式的踩坑之路: 1.编写一个java程序去读取原始数据文件,模拟1s进行文件的插入一行,原始的数据文件格式如下: 坑a .整个的数据格式是js

flume 整合kafka

背景:系统的数据量越来越大,日志不能再简单的文件的保存,如此日志将会越来越大,也不方便查找与分析,综合考虑下使用了flume来收集日志,收集日志后向kafka传递消息,下面给出具体的配置 # The configuration file needs to define the sources, # the channels and the sinks. # Sources, channels and sinks are defined per agent, # in this case call

Spark学习视频整合

1.<Scala深入浅出实战经典>http://pan.baidu.com/s/1pJnAUr5 2.<Spark纯实战公益大讲坛>http://pan.baidu.com/s/1sLeVk 3.<Docker公益大讲坛>http://pan.baidu.com/s/1hq0GztU 4.<spark亚太研究院spark公益大讲堂>http://pan.baidu.com/s/1i30Ewsd 5.DT大数据梦工厂spark和scala的所有视频.PPT和代

Spark 实时计算整合案例

1.概述 最近有同学问道,除了使用 Storm 充当实时计算的模型外,还有木有其他的方式来实现实时计算的业务.了解到,在使用 Storm 时,需要编写基于编程语言的代码.比如,要实现一个流水指标的统计,需要去编写相应的业务代码,能不能有一种简便的方式来实现这一需求.在解答了该同学的疑惑后,整理了该实现方案的一个案例,供后面的同学学习参考. 2.内容 实现该方案,整体的流程是不变的,我这里只是替换了其计算模型,将 Storm 替换为 Spark,原先的数据收集,存储依然可以保留. 2.1 Spar