1. 首先启动zookeeper
2. 启动kafka
3. 核心代码
- 生产者生产消息的java代码,生成要统计的单词
package streaming; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyProducer { public static void main(String[] args) { Properties props = new Properties(); props.setProperty("metadata.broker.list","localhost:9092"); props.setProperty("serializer.class","kafka.serializer.StringEncoder"); props.put("request.required.acks","1"); ProducerConfig config = new ProducerConfig(props); //创建生产这对象 Producer<String, String> producer = new Producer<String, String>(config); //生成消息 KeyedMessage<String, String> data1 = new KeyedMessage<String, String>("top1","test kafka"); KeyedMessage<String, String> data2 = new KeyedMessage<String, String>("top2","hello world"); try { int i =1; while(i < 100){ //发送消息 producer.send(data1); producer.send(data2); i++; Thread.sleep(1000); } } catch (Exception e) { e.printStackTrace(); } producer.close(); } }
- 在SparkStreaming中接收指定话题的数据,对单词进行统计
package streaming; import java.util.HashMap; import java.util.Map; import java.util.regex.Pattern; import org.apache.spark.*; import org.apache.spark.api.java.function.*; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; import com.google.common.collect.Lists; public class KafkaStreamingWordCount { public static void main(String[] args) { //设置匹配模式,以空格分隔 final Pattern SPACE = Pattern.compile(" "); //接收数据的地址和端口 String zkQuorum = "localhost:2181"; //话题所在的组 String group = "1"; //话题名称以“,”分隔 String topics = "top1,top2"; //每个话题的分片数 int numThreads = 2; SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]"); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000)); // jssc.checkpoint("checkpoint"); //设置检查点 //存放话题跟分片的映射关系 Map<String, Integer> topicmap = new HashMap<>(); String[] topicsArr = topics.split(","); int n = topicsArr.length; for(int i=0;i<n;i++){ topicmap.put(topicsArr[i], numThreads); } //从Kafka中获取数据转换成RDD JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap); //从话题中过滤所需数据 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() { @Override public Iterable<String> call(Tuple2<String, String> arg0) throws Exception { return Lists.newArrayList(SPACE.split(arg0._2)); } }); //对其中的单词进行统计 JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); //打印结果 wordCounts.print(); jssc.start(); jssc.awaitTermination(); } }
时间: 2024-10-12 13:13:42