分享一下spark streaming与flume集成的scala代码。

object LogicHandle {
  def main(args: Array[String]) {
    //添加这个不会报执行错误
    val path = new File(".").getCanonicalPath()
    System.getProperties().put("hadoop.home.dir", path);
    new File("./bin").mkdirs();
    new File("./bin/winutils.exe").createNewFile();

    //val sparkConf = new SparkConf().setAppName("SensorRealTime").setMaster("local[2]")
    val sparkConf = new SparkConf().setAppName("SensorRealTime")

    val ssc = new StreamingContext(sparkConf, Seconds(20))

    val hostname = "localhost"
    val port = 2345
    val storageLevel = StorageLevel.MEMORY_ONLY
    val flumeStream = FlumeUtils.createStream(ssc, hostname, port, storageLevel)

    val lhc = new LogicHandleClass();

    //日志格式化模板
    val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    val sdfHour = new SimpleDateFormat("HH");
    val sdfMinute = new SimpleDateFormat("mm")

    //存储数据的hash对象  key/value存储  根据文档规则,使用各统计指标的key/value
    var redisMap = new HashMap[String, String]
      
    flumeStream.foreachRDD(rdd => {
      val events = rdd.collect()
      //println("event count:" + events.length)
      var i = 1
      for (event <- events) {
        val sensorInfo = new String(event.event.getBody.array()) //单行记录
        //单行记录格式化
        val arrayFileds = sensorInfo.split(",")
        if (arrayFileds.length == 6) {
          val shopId = arrayFileds(0) //店内编号

          val floorId = shopId.substring(0, 5) //楼层编号
          val mac = arrayFileds(1)
          val ts = arrayFileds(2).toLong //时间戳
          val time = sdf.format(ts * 1000)
          var hour = sdfHour.format(ts * 1000)
          var minute = sdfMinute.format(ts * 1000)
          var allMinute = hour.toInt * 60 + minute.toInt

          val x = arrayFileds(3)
          val y = arrayFileds(4)
          val level = arrayFileds(5)

          //后边就是我的业务代码了,省略了
        }
      }

      //存储至redis中
      lhc.SetAll(redisMap)
    })

    ssc.start()
    ssc.awaitTermination()
  }
}
时间: 2024-10-29 05:04:52

分享一下spark streaming与flume集成的scala代码。的相关文章

Spark学习七:spark streaming与flume集成

Spark学习七:spark streaming与flume集成 标签(空格分隔): Spark 一,启动flume flume-conf.properties文件 agent002.sources = sources002 agent002.channels = channels002 agent002.sinks = sinks002 ## define sources agent002.sources.sources002.type = exec agent002.sources.sour

Spark Streaming和Flume集成指南V1.4.1

Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务.这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据.这里有两个方法. Python API:Flume现在还不支持PythonAPI 方法1:Flume风格的推方法 Flume被设计用来在Flume代理之间推送数据.在这种方法中,Spark Streaming本质上设置了一个接收器作为Flume的一个Avro代理,Flume把数据推送到接收器上.下面是配置的步骤. 一

Spark学习八:spark streaming与flume和kafka集成

Spark学习八:spark streaming与flume和kafka集成 标签(空格分隔): Spark Spark学习八spark streaming与flume和kafka集成 一Kafka 二flume和kafka的集成 三kafka和spark streaming的集成方式一kafka推送 四kafka和spark streaming的集成方式一spark streaam主动获取 五spark stream的高级应用updateStateByKey实现累加功能 六spark stre

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark"蘑菇云"行动之spark streaming消费flume采集的kafka数据Directf方式作业.     一.基本背景 Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式.具体的流程是这样的: 1.Direct方式是直接连接到kafka的节点上获取数据了. 2.基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offs

Spark 系列(十五)—— Spark Streaming 整合 Flume

一.简介 Apache Flume 是一个分布式,高可用的数据收集系统,可以从不同的数据源收集数据,经过聚合后发送到分布式计算框架或者存储系统中.Spark Straming 提供了以下两种方式用于 Flume 的整合. 二.推送式方法 在推送式方法 (Flume-style Push-based Approach) 中,Spark Streaming 程序需要对某台服务器的某个端口进行监听,Flume 通过 avro Sink 将数据源源不断推送到该端口.这里以监听日志文件为例,具体整合方式如

Spark Streaming整合Flume

1 目的 Spark Streaming整合Flume.参考官方整合文档(http://spark.apache.org/docs/2.2.0/streaming-flume-integration.html) 2 整合方式一:基于推 2.1 基本要求 flume和spark一个work节点要在同一台机器上,flume会在本机器上通过配置的端口推送数据 streaming应用必须先启动,receive必须要先监听推送数据的端口后,flume才能推送数据 添加如下依赖 groupId = org.

cdh环境下,spark streaming与flume的集成问题总结

如何做集成,其实特别简单,网上其实就是教程. http://blog.csdn.net/fighting_one_piece/article/details/40667035  看这里就成. 我用的是第一种集成.. 做的时候,出现了各种问题.    大概从从2014.12.17 早晨5点搞到2014.12.17晚上18点30 总结起来其实很简单,但做的时候搞了许久啊啊啊!!!!   这样的事情,吃一堑长一智吧 问题1.  需要引用各种包,这些包要打入你的JAR中, 因为用的是spark on y

第88课:Spark Streaming从Flume Pull数据案例实战及内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Pulling from Flume实战 二.Spark Streaming on Pulling from Flume源码解析 先简单介绍下Flume的两种模式:推模式(Flume push to Spark Streaming)和 拉模式(Spark Streaming pull from Flume ) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以连接,就将数据push过去.(简单,耦

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming