生产端程序
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import scala.util.Random
import java.util
object KafkaProducer {
def main(args: Array[String]): Unit = {
//kafka-console-producer.sh --broker-list master:9092,master:9093 -topic mykafka2
val brokers="master:9092,master:9093"
val topic = "mykafka2"
val props = new util.HashMap[String,Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
val msgPerSec = 2
val wordgPerMsg = 3
val producer = new KafkaProducer[String,String](props)
while(true){
for(i<- 1 to msgPerSec){
val str = (1 to wordgPerMsg).map(x=>Random.nextInt(100).toString).mkString(" ")
println(str)
val msg = new ProducerRecord[String,String](topic,null,str)
producer.send(msg)
}
Thread.sleep(1000)
}
}
}
生产端程序运行结果
消费端程序
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka._
object kafkaWorldCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaWorldCount")
val ssc = new StreamingContext(conf,Seconds(10))
ssc.sparkContext.setLogLevel("warn")
val zkQurom = "master:12181/kafka0.11"
val group = "888"
val topics ="mykafka2"
val numThreads = 3
val topMap = topics.split(" ").map((_,numThreads)).toMap
val lines = KafkaUtils.createStream(ssc,zkQurom,group,topMap)
val words = lines.map(_._2).flatMap(_.split(" "))
val pairs = words.map((_,1))
val wordcounts = pairs.reduceByKey(_+_)
wordcounts.print()
ssc.start()
ssc.awaitTermination()
}
}
消费端程序运行结果
原文地址:https://www.cnblogs.com/BrentBoys/p/10802730.html