spark JavaDirectKafkaWordCount 例子分析:
1、
KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet );后面参数意思: 源码是这样
@param ssc StreamingContext object * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> * configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" * to be set with Kafka broker(s) (NOT zookeeper servers) specified in * host1:port1,host2:port2 form. * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) * starting point of the stream * @param messageHandler Function for translating each message and metadata into the desired type * @tparam K type of Kafka message key * @tparam V type of Kafka message value * @tparam KD type of Kafka message key decoder * @tparam VD type of Kafka message value decoder * @tparam R type returned by messageHandler * @return DStream of R */def createDirectStream[ K: ClassTag, V: ClassTag, KD <: Decoder[K]: ClassTag, VD <: Decoder[V]: ClassTag, R: ClassTag] ( ssc: StreamingContext, kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long], messageHandler: MessageAndMetadata[K, V] => R): InputDStream[R] = { val cleanedHandler = ssc.sc.clean(messageHandler) new DirectKafkaInputDStream[K, V, KD, VD, R]( ssc, kafkaParams, fromOffsets, cleanedHandler)}
2、数据在输入到输出经历几个阶段:先map返回JavaDStream<String>类型
然后flatMap 返回JavaDStream<String>类型
在 然后mapToPair返回JavaPairDStream<String, Integer>
最后reduceByKey 获得两数之和
完整例子请看尾部完整代码
import java.util.HashMap;import java.util.HashSet;import java.util.Arrays;import java.util.regex.Pattern; import scala.Tuple2; import com.google.common.collect.Lists;import kafka.serializer.StringDecoder; import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.*;import org.apache.spark.streaming.api.java.*;import org.apache.spark.streaming.kafka.KafkaUtils;import org.apache.spark.streaming.Durations; /** * Consumes messages from one or more topics in Kafka and does wordcount. * Usage: JavaDirectKafkaWordCount <brokers> <topics> * <brokers> is a list of one or more Kafka brokers * <topics> is a list of one or more kafka topics to consume from * * Example: * $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 */ public final class JavaDirectKafkaWordCount { private static final Pattern SPACE = Pattern.compile(" "); public static void main(String[] args) { if (args.length < 2) { System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" + " <brokers> is a list of one or more Kafka brokers\n" + " <topics> is a list of one or more kafka topics to consume from\n\n"); System.exit(1); } StreamingExamples.setStreamingLogLevels(); String brokers = args[0]; String topics = args[1]; // Create context with a 2 seconds batch interval SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount"); JavaStreamingContext jssc; jssc = new (sparkConf, Durations.seconds(2)); HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(","))); HashMap<String, String> kafkaParams = new HashMap<String, String>(); kafkaParams.put("metadata.broker.list", brokers); // Create direct kafka stream with brokers and topics JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream( jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet ); // Get the lines, split them into words, count the words and print JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.mapToPair( new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey( new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); // Start the computation jssc.start(); jssc.awaitTermination(); }}
时间: 2024-11-12 05:15:42