// scalastyle:off println package org.apache.spark.examples.streaming import kafka.serializer.StringDecoder import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import org.apache.spark.streaming.scheduler.StreamingListener import scala.util.parsing.json.JSON /** * Consumes messages from one or more topics to analysis log * calaculate the threadhold under certain time window */ object LogAnalysisB { def main(args: Array[String]) { if (args.length < 2) { System.err.println(s""" |Usage: DirectKafkaWordCount <brokers> <topics> | <brokers> is a list of one or more Kafka brokers | <topics> is a list of one or more kafka topics to consume from | """.stripMargin) System.exit(1) } val WINDOW_LENGTH = new Duration(30 * 1000) val SLIDE_INTERVAL = new Duration(10 * 1000) StreamingExamples.setStreamingLogLevels() val Array(brokers, topics) = args val sparkConf = new SparkConf().setAppName("ELK Log Analysis windows Threhold") val ssc = new StreamingContext(sparkConf,SLIDE_INTERVAL) ssc.addStreamingListener(new RuleFileListenerB()) // Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) // Get the lines, split them into words, count the words and print val lines = messages.map(_._2).map(HostAppLog.parseLogLine) val windowDStream = lines.window(WINDOW_LENGTH,SLIDE_INTERVAL) windowDStream.foreachRDD( logs=> { val topChar = logs .map(log => (log.msg, 1)) .reduceByKey(_ + _) .top(3)(OrderingUtils.SecondValueOrdering) println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$") println( s"""Top Endpoints: ${topChar.mkString("[", ",", "]")}""") val topTest = logs .map(log =>(log.host+log.app,if (log.msg.contains("A")) 1 else 0)) .reduceByKey(_+_) .filter(_._2 > 5) .take(10) println( s"""A > 5 times: ${topTest.mkString("[", ",", "]")}""") } ) // Start the computation ssc.start() ssc.awaitTermination() } def wc(ssc:StreamingContext,map:Map[Any,Any]): Unit = { if( map.get("message").toString().contains("A")) println("find A in message:" + map.toString()) } } class RuleFileListenerB extends StreamingListener { override def onBatchStarted(batchStarted : org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted) { println("---------------------------------------------------------------------------------------------------------------------------------------------") println("check whether the file‘s modified date is change, if change then reload the configuration file") //val source = scala.io.Source.fromFile("D:/code/scala/test") //val lines = try source.mkString finally source.close() //println(lines) println("---------------------------------------------------------------------------------------------------------------------------------------------") } } // scalastyle:on println
时间: 2025-01-04 06:00:50