Spark Streaming 实现读取Kafka 生产数据

在kafka 目录下执行生产消息命令:

  ./kafka-console-producer  --broker-list nodexx:9092 --topic  201609

在spark bin 目录下执行

./run-example streaming.JavaDirectKafkaWordCount nodexx:9092, nodexx:9092 201609 
import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.regex.Pattern;

import scala.Tuple2;

import com.google.common.collect.Lists;
import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;

/**
 * Consumes messages from one or more topics in Kafka and does wordcount.
 * Usage: JavaDirectKafkaWordCount <brokers> <topics>
 *   <brokers> is a list of one or more Kafka brokers
 *   <topics> is a list of one or more kafka topics to consume from
 *
 * Example:
 *    $ bin/run-example streaming.JavaDirectKafkaWordCount broker1-host:port,broker2-host:port topic1,topic2
 */

public final class JavaDirectKafkaWordCount {
  private static final Pattern SPACE = Pattern.compile(" ");

  public static void main(String[] args) {
    if (args.length < 2) {
      System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +
          "  <brokers> is a list of one or more Kafka brokers\n" +
          "  <topics> is a list of one or more kafka topics to consume from\n\n");
      System.exit(1);
    }

    StreamingExamples.setStreamingLogLevels();

    String brokers = args[0];
    String topics = args[1];

    // Create context with a 2 seconds batch interval
    SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
      JavaStreamingContext jssc;
     jssc = new (sparkConf, Durations.seconds(2));

    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
    HashMap<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", brokers);

    // Create direct kafka stream with brokers and topics
    JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
        jssc,
        String.class,
        String.class,
        StringDecoder.class,
        StringDecoder.class,
        kafkaParams,
        topicsSet
    );

    // Get the lines, split them into words, count the words and print
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
      @Override
      public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
      }
    });
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) {
        return Lists.newArrayList(SPACE.split(x));
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(
        new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });
    wordCounts.print();

    // Start the computation
    jssc.start();
    jssc.awaitTermination();
  }
}

  

时间: 2024-08-01 23:21:18

Spark Streaming 实现读取Kafka 生产数据的相关文章

Flink与Spark Streaming在与kafka结合的区别!

本文主要是想聊聊flink与kafka结合.当然,单纯的介绍flink与kafka的结合呢,比较单调,也没有可对比性,所以的准备顺便帮大家简单回顾一下Spark Streaming与kafka的结合. 看懂本文的前提是首先要熟悉kafka,然后了解spark Streaming的运行原理及与kafka结合的两种形式,然后了解flink实时流的原理及与kafka结合的方式. kafka kafka作为一个消息队列,在企业中主要用于缓存数据,当然,也有人用kafka做存储系统,比如存最近七天的数据.

Exactly-once Spark Streaming from Apache Kafka

这篇文章我已经看过两遍了.收获颇多,抽个时间翻译下,先贴个原文链接吧.也给自己留个任务 http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

spark streaming从指定offset处消费Kafka数据

spark streaming从指定offset处消费Kafka数据 2017-06-13 15:19 770人阅读 评论(2) 收藏 举报 分类: spark(5) 原文地址:http://blog.csdn.net/high2011/article/details/53706446 首先很感谢原文作者,看到这篇文章我少走了很多弯路,转载此文章是为了保留一份供复习用,请大家支持原作者,移步到上面的连接去看,谢谢 一.情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十九)待整理

redis按照正则批量删除key redis客户端--jedis 在Spark结构化流readStream.writeStream 输入输出,及过程ETL Spark Structured Streaming入门编程指南 Structured Streaming 实现思路与实现概述 Spark结构式流编程指南 spark streaming重复消费kafka记录,需要删除checkpoint保存目录. Kafka 如何读取offset topic内容 (__consumer_offsets) 原

关于IDEA开发环境下的Kafka+Spark Streaming的classpath配置方式

一.前言 在使用Spark Streaming中的Kafka Direct API进行Kafka消费的过程中,通过spark-submit的方式提交jar包,会出现如下错误信息,提示无法找到KafkaUtils. Exceptionin thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils$ at com.zhkmxx.scala.app.KafkaStream

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十七)待整理

redis按照正则批量删除key redis客户端--jedis 在Spark结构化流readStream.writeStream 输入输出,及过程ETL Spark Structured Streaming入门编程指南 Structured Streaming 实现思路与实现概述 Spark结构式流编程指南 spark streaming重复消费kafka记录,需要删除checkpoint保存目录. 原文地址:https://www.cnblogs.com/yy3b2007com/p/9315

4. Spark Streaming解析

4.1 初始化StreamingContext import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) // 可以通过 ssc.sparkContext 来访问 SparkContext // 或者通过已

Spark Streaming实践和优化

发表于:<程序员>杂志2016年2月刊.链接:http://geek.csdn.net/news/detail/54500 作者:徐鑫,董西成 在流式计算领域,Spark Streaming和Storm时下应用最广泛的两个计算引擎.其中,Spark Streaming是Spark生态系统中的重要组成部分,在实现上复用Spark计算引擎.如图1所示,Spark Streaming支持的数据源有很多,如Kafka.Flume.TCP等.Spark Streaming的内部数据表示形式为DStrea