spark-streaming与flume整合 push
package cn.my.sparkStream import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ import org.apache.spark.streaming.flume._ /** */ object SparkFlumePush { def main(args: Array[String]) { if (args.length < 2) { System.err.println( "Usage: FlumeEventCount <host> <port>") System.exit(1) } LogLevel.setStreamingLogLevels() val Array(host, port) = args val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events.").print() //拿到消息中的event,从event中拿出body,body是真正的消息体 stream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print ssc.start() ssc.awaitTermination() } }
package cn.my.sparkStream import java.net.InetSocketAddress import org.apache.spark.SparkConfimport org.apache.spark.storage.StorageLevelimport org.apache.spark.streaming._import org.apache.spark.streaming.flume._ /** * */object SparkFlumePull { def main(args: Array[String]) { if (args.length < 2) { System.err.println( "Usage: FlumeEventCount <host> <port>") System.exit(1) } LogLevel.setStreamingLogLevels() val Array(host, port) = args val batchInterval = Milliseconds(2000) // Create the context and set the batch size val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local[2]") val ssc = new StreamingContext(sparkConf, batchInterval) // Create a flume stream //val stream = FlumeUtils.createStream(ssc, host, port.toInt, StorageLevel.MEMORY_ONLY_SER_2) // val flumeStream = FlumeUtils.createPollingStream(ssc, host, port.toInt) /* def createPollingStream( jssc: JavaStreamingContext, addresses: Array[InetSocketAddress], storageLevel: StorageLevel ): */ //当sink有多个的时候 val flumesinklist = Array[InetSocketAddress](new InetSocketAddress("mini1", 8888)) val flumeStream = FlumeUtils.createPollingStream(ssc, flumesinklist, StorageLevel.MEMORY_ONLY_2) flumeStream.count().map(cnt => "Received " + cnt + " flume events.").print() flumeStream.flatMap(t => { new String(t.event.getBody.array()).split(" ") }).map((_, 1)).reduceByKey(_ + _).print() // Print out the count of events received from this server in each batch //stream.count().map(cnt => "Received " + cnt + " flume events.").print() //拿到消息中的event,从event中拿出body,body是真正的消息体 //stream.flatMap(t=>{new String(t.event.getBody.array()).split(" ")}).map((_,1)).reduceByKey(_+_).print ssc.start() ssc.awaitTermination() }}
http://spark.apache.org/docs/1.6.3/streaming-flume-integration.html
时间: 2024-08-02 09:52:37