spark JavaDirectKafkaWordCount 例子分析

spark  JavaDirectKafkaWordCount 例子分析:

1、

KafkaUtils.createDirectStream(        jssc,        String.class,        String.class,        StringDecoder.class,        StringDecoder.class,        kafkaParams,        topicsSet    );后面参数意思: 源码是这样

 @param ssc StreamingContext object * @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration"> *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers" *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in *    host1:port1,host2:port2 form. * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive) *    starting point of the stream * @param messageHandler Function for translating each message and metadata into the desired type * @tparam K type of Kafka message key * @tparam V type of Kafka message value * @tparam KD type of Kafka message key decoder * @tparam VD type of Kafka message value decoder * @tparam R type returned by messageHandler * @return DStream of R */def createDirectStream[  K: ClassTag,  V: ClassTag,  KD <: Decoder[K]: ClassTag,  VD <: Decoder[V]: ClassTag,  R: ClassTag] (    ssc: StreamingContext,    kafkaParams: Map[String, String],    fromOffsets: Map[TopicAndPartition, Long],    messageHandler: MessageAndMetadata[K, V] => R): InputDStream[R] = {  val cleanedHandler = ssc.sc.clean(messageHandler)  new DirectKafkaInputDStream[K, V, KD, VD, R](    ssc, kafkaParams, fromOffsets, cleanedHandler)}

2、数据在输入到输出经历几个阶段:先map返回JavaDStream<String>类型

                 然后flatMap 返回JavaDStream<String>类型

                 在 然后mapToPair返回JavaPairDStream<String, Integer>

                 最后reduceByKey 获得两数之和  


完整例子请看尾部完整代码


import java.util.HashMap;import java.util.HashSet;import java.util.Arrays;import java.util.regex.Pattern;

import scala.Tuple2;

import com.google.common.collect.Lists;import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;import org.apache.spark.api.java.function.*;import org.apache.spark.streaming.api.java.*;import org.apache.spark.streaming.kafka.KafkaUtils;import org.apache.spark.streaming.Durations;

/** * Consumes messages from one or more topics in Kafka and does wordcount. * Usage: JavaDirectKafkaWordCount <brokers> <topics> *   <brokers> is a list of one or more Kafka brokers *   <topics> is a list of one or more kafka topics to consume from * * Example: *    $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port topic1,topic2 */

public final class JavaDirectKafkaWordCount {  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) {    if (args.length < 2) {      System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +          "  <brokers> is a list of one or more Kafka brokers\n" +          "  <topics> is a list of one or more kafka topics to consume from\n\n");      System.exit(1);    }

    StreamingExamples.setStreamingLogLevels();

    String brokers = args[0];    String topics = args[1];

    // Create context with a 2 seconds batch interval    SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");    JavaStreamingContext jssc;     jssc = new (sparkConf, Durations.seconds(2));

    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));    HashMap<String, String> kafkaParams = new HashMap<String, String>();    kafkaParams.put("metadata.broker.list", brokers);

    // Create direct kafka stream with brokers and topics    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(        jssc,        String.class,        String.class,        StringDecoder.class,        StringDecoder.class,        kafkaParams,        topicsSet    );

    // Get the lines, split them into words, count the words and print    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {      @Override      public String call(Tuple2<String, String> tuple2) {        return tuple2._2();      }    });    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {      @Override      public Iterable<String> call(String x) {        return Lists.newArrayList(SPACE.split(x));      }    });    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(      new PairFunction<String, String, Integer>() {        @Override        public Tuple2<String, Integer> call(String s) {          return new Tuple2<String, Integer>(s, 1);        }      }).reduceByKey(        new Function2<Integer, Integer, Integer>() {        @Override        public Integer call(Integer i1, Integer i2) {          return i1 + i2;        }      });    wordCounts.print();

    // Start the computation    jssc.start();    jssc.awaitTermination();  }}
				
时间: 2024-11-12 05:15:42

spark JavaDirectKafkaWordCount 例子分析的相关文章

spark源码分析之Executor启动与任务提交篇

任务提交流程 概述 在阐明了Spark的Master的启动流程与Worker启动流程.接下继续执行的就是Worker上的Executor进程了,本文继续分析整个Executor的启动与任务提交流程 Spark-submit 提交一个任务到集群通过的是Spark-submit 通过启动脚本的方式启动它的主类,这里以WordCount为例子 spark-submit --class cn.itcast.spark.WordCount bin/spark-clas -> org.apache.spar

Accuracy(准确率), Precision(精确率), 和F1-Measure, 结合Spark源码分析

例子 某大学一个系,总共100人,其中男90人,女10人,现在根据每个人的特征,预测性别 Accuracy(准确率) Accuracy=预测正确的数量需要预测的总数 计算 由于我知道男生远多于女生,所以我完全无视特征,直接预测所有人都是男生 我预测所的人都是男生,而实际有90个男生,所以 预测正确的数量 = 90 需要预测的总数 = 100 Accuracy = 90 / 100 = 90% 问题 在男女比例严重不均匀的情况下,我只要预测全是男生,就能获得极高的Accuracy. 所以在正负样本

Spark源码系列(八)Spark Streaming实例分析

这一章要讲Spark Streaming,讲之前首先回顾下它的用法,具体用法请参照<Spark Streaming编程指南>. Example代码分析 val ssc = new StreamingContext(sparkConf, Seconds(1)); // 获得一个DStream负责连接 监听端口:地址 val lines = ssc.socketTextStream(serverIP, serverPort); // 对每一行数据执行Split操作 val words = line

Spark SQL 源代码分析之 In-Memory Columnar Storage 之 in-memory query

/** Spark SQL源代码分析系列文章*/ 前面讲到了Spark SQL In-Memory Columnar Storage的存储结构是基于列存储的. 那么基于以上存储结构,我们查询cache在jvm内的数据又是怎样查询的,本文将揭示查询In-Memory Data的方式. 一.引子 本例使用hive console里查询cache后的src表. select value from src 当我们将src表cache到了内存后,再次查询src,能够通过analyzed运行计划来观察内部调

Spark源码分析之八:Task运行(二)

在<Spark源码分析之七:Task运行(一)>一文中,我们详细叙述了Task运行的整体流程,最终Task被传输到Executor上,启动一个对应的TaskRunner线程,并且在线程池中被调度执行.继而,我们对TaskRunner的run()方法进行了详细的分析,总结出了其内Task执行的三个主要步骤: Step1:Task及其运行时需要的辅助对象构造,主要包括: 1.当前线程设置上下文类加载器: 2.获取序列化器ser: 3.更新任务状态TaskState: 4.计算垃圾回收时间: 5.反

Spark源码分析之四:Stage提交

各位看官,上一篇<Spark源码分析之Stage划分>详细讲述了Spark中Stage的划分,下面,我们进入第三个阶段--Stage提交. Stage提交阶段的主要目的就一个,就是将每个Stage生成一组Task,即TaskSet,其处理流程如下图所示: 与Stage划分阶段一样,我们还是从handleJobSubmitted()方法入手,在Stage划分阶段,包括最好的ResultStage和前面的若干ShuffleMapStage均已生成,那么顺理成章的下一步便是Stage的提交.在han

Spark源码分析之六:Task调度(二)

话说在<Spark源码分析之五:Task调度(一)>一文中,我们对Task调度分析到了DriverEndpoint的makeOffers()方法.这个方法针对接收到的ReviveOffers事件进行处理.代码如下: [java] view plain copy // Make fake resource offers on all executors // 在所有的executors上提供假的资源(抽象的资源,也就是资源的对象信息,我是这么理解的) private def makeOffers

Spark源码分析之二:Job的调度模型与运行反馈

在<Spark源码分析之Job提交运行总流程概述>一文中,我们提到了,Job提交与运行的第一阶段Stage划分与提交,可以分为三个阶段: 1.Job的调度模型与运行反馈: 2.Stage划分: 3.Stage提交:对应TaskSet的生成. 今天,我们就结合源码来分析下第一个小阶段:Job的调度模型与运行反馈. 首先由DAGScheduler负责将Job提交到事件队列eventProcessLoop中,等待调度执行.入口方法为DAGScheduler的runJon()方法.代码如下: [jav

Spark源码分析之七:Task运行(一)

在Task调度相关的两篇文章<Spark源码分析之五:Task调度(一)>与<Spark源码分析之六:Task调度(二)>中,我们大致了解了Task调度相关的主要逻辑,并且在Task调度逻辑的最后,CoarseGrainedSchedulerBackend的内部类DriverEndpoint中的makeOffers()方法的最后,我们通过调用TaskSchedulerImpl的resourceOffers()方法,得到了TaskDescription序列的序列Seq[Seq[Tas