spark streaming kafka example

// scalastyle:off println
package org.apache.spark.examples.streaming

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.scheduler.StreamingListener
import scala.util.parsing.json.JSON

/**
 * Consumes messages from one or more topics to analysis log
 * calaculate the threadhold under certain time window
 */
object LogAnalysisB {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(s"""
        |Usage: DirectKafkaWordCount <brokers> <topics>
        |  <brokers> is a list of one or more Kafka brokers
        |  <topics> is a list of one or more kafka topics to consume from
        |
        """.stripMargin)
      System.exit(1)
    }
    val WINDOW_LENGTH = new Duration(30 * 1000)
    val SLIDE_INTERVAL = new Duration(10 * 1000)
    StreamingExamples.setStreamingLogLevels()
    val Array(brokers, topics) = args
    val sparkConf = new SparkConf().setAppName("ELK Log Analysis windows Threhold")
    val ssc = new StreamingContext(sparkConf,SLIDE_INTERVAL)
    ssc.addStreamingListener(new RuleFileListenerB())
    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2).map(HostAppLog.parseLogLine)
    val windowDStream = lines.window(WINDOW_LENGTH,SLIDE_INTERVAL)
    windowDStream.foreachRDD( logs=>
    {
      val topChar = logs
        .map(log => (log.msg, 1))
        .reduceByKey(_ + _)
        .top(3)(OrderingUtils.SecondValueOrdering)
      println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")
      println( s"""Top Endpoints: ${topChar.mkString("[", ",", "]")}""")

      val topTest = logs
      .map(log =>(log.host+log.app,if (log.msg.contains("A")) 1 else 0))
      .reduceByKey(_+_)
      .filter(_._2 > 5)
      .take(10)
      println( s"""A > 5 times: ${topTest.mkString("[", ",", "]")}""")
    }
    )

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
   def wc(ssc:StreamingContext,map:Map[Any,Any]): Unit =
   {
     if( map.get("message").toString().contains("A"))
       println("find A in message:" + map.toString())

   }

}

class RuleFileListenerB extends StreamingListener {

  override def onBatchStarted(batchStarted : org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted) {
    println("---------------------------------------------------------------------------------------------------------------------------------------------")
        println("check whether the file‘s modified date is change, if change then reload the configuration file")
    //val source = scala.io.Source.fromFile("D:/code/scala/test")
    //val lines = try source.mkString finally source.close()
    //println(lines)
    println("---------------------------------------------------------------------------------------------------------------------------------------------")
  }

}
// scalastyle:on println
时间: 2024-08-26 18:17:15

spark streaming kafka example的相关文章

第99课:使用Spark Streaming+Kafka实战对论坛网站动态行为的多维度分析及java.lang.NoClassDefFoundError问题解决完整内幕版本解密

第99课:使用Spark Streaming 实战对论坛网站动态行为的多维度分析 /* 王家林老师授课http://weibo.com/ilovepains  每天晚上20:00YY频道现场授课频道68917580*/ /** * *第99课:使用Spark Streaming 实战对论坛网站动态行为的多维度分析 * 论坛数据自动生成代码,该生成的数据会作为Producer的方式发送给Kafka,然后SparkStreaming程序会从 * Kafka中在线Pull到论坛或者网站的用户在线行为信

Spark streaming + Kafka 流式数据处理,结果存储至MongoDB、Solr、Neo4j(自用)

KafkaStreaming.scala文件 import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.kafka.{KafkaManagerAdd, KafkaUtils} import org.json4s.Defau

160728、Spark Streaming kafka 实现数据零丢失的几种方式

定义 问题开始之前先解释下流处理中的一些概念: At most once - 每条数据最多被处理一次(0次或1次) At least once - 每条数据最少被处理一次 (1次或更多) Exactly once - 每条数据只会被处理一次(没有数据会丢失,并且没有数据会被多次处理) High Level API   如果不做容错,将会带来数据丢失因为receiver一直在接收数据,在其没有处理的时候(已通知zk数据接收到),executor突然挂掉(或是driver挂掉通知executor关闭

下载基于大数据技术推荐系统实战教程(Spark ML Spark Streaming Kafka Hadoop Mahout Flume Sqoop Redis)

地址:http://pan.baidu.com/s/1c2tOtwc  密码:yn2r 82课高清完整版,转一播放码. 互联网行业是大数据应用最前沿的阵地,目前主流的大数据技术,包括 hadoop,spark等,全部来自于一线互联网公司.从应用角度讲,大数据在互联网领域主要有三类应用:搜索引擎(比如百度,谷歌等),广告系统(比如百度凤巢,阿里妈妈等)和推荐系统(比如阿里巴巴天猫推荐,优酷视频推荐等). 本次培训以商业实战项目作为驱动来学习大数据技术在推荐系统项目中的应用.使得学员能够亲身体会大数

spark streaming kafka

SparkStreaming+Kafka •kafka是什么,有哪些特点 •SparkStreaming+Kafka有什么好处 –解耦 –缓冲 消息列队的特点 生产者消费者模式 •可靠性保证 –自己不丢数据 –消费者不丢数据:“至少一次,严格一次” broker n. 经纪人,掮客 vt. 以中间人等身分安排... vi. 作为权力经纪人进行谈判 kafka部署 node2,3,4 基于zookeeper 启动 三台 zookeeper /opt/sxt/zookeeper-3.4.6/bin/

Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集群:http://blog.csdn.net/tototuzuoquan/article/details/73430874 B:创建topic等:http://blog.csdn.net/tototuzuoquan/article/details/73430874 3.编写Pom文件 <?xml v

关于IDEA开发环境下的Kafka+Spark Streaming的classpath配置方式

一.前言 在使用Spark Streaming中的Kafka Direct API进行Kafka消费的过程中,通过spark-submit的方式提交jar包,会出现如下错误信息,提示无法找到KafkaUtils. Exceptionin thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at com.zhkmxx.scala.app.KafkaStream

SBT 构建 spark streaming集成kafka (scala版本)

前言: 最近在研究spark 还有 kafka , 想通过kafka端获取的数据,利用spark streaming进行一些计算,但搭建整个环境着实不易,故特此写下该过程,分享给大家,希望大家可以少走点弯路,能帮到大家! 环境准备:    操作系统 : ubuntu14.04 LTS hadoop 2.7.1   伪分布式搭建 sbt-0.13.9 kafka_2.11-0.8.2.2 spark-1.3.1-bin-hadoop2.6 scala 版本 : 2.10.4 注: 请重视版本问题,

2016年大数据Spark“蘑菇云”行动之spark streaming消费flume采集的kafka数据Directf方式

王家林老师的课程:2016年大数据Spark"蘑菇云"行动之spark streaming消费flume采集的kafka数据Directf方式作业.     一.基本背景 Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式.具体的流程是这样的: 1.Direct方式是直接连接到kafka的节点上获取数据了. 2.基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offs