spark streaming 接收 kafka 数据java代码WordCount示例

1. 首先启动zookeeper

2. 启动kafka

3. 核心代码

  • 生产者生产消息的java代码,生成要统计的单词
package streaming;

import java.util.Properties; 

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig; 

public class MyProducer {   

        public static void main(String[] args) {
            Properties props = new Properties();
            props.setProperty("metadata.broker.list","localhost:9092");
            props.setProperty("serializer.class","kafka.serializer.StringEncoder");
            props.put("request.required.acks","1");
            ProducerConfig config = new ProducerConfig(props);
            //创建生产这对象
            Producer<String, String> producer = new Producer<String, String>(config);
            //生成消息
            KeyedMessage<String, String> data1 = new KeyedMessage<String, String>("top1","test kafka");
            KeyedMessage<String, String> data2 = new KeyedMessage<String, String>("top2","hello world");
            try {
                int i =1;
                while(i < 100){
                    //发送消息
                    producer.send(data1);
                    producer.send(data2);
                    i++;
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            producer.close();
        }
}
  • 在SparkStreaming中接收指定话题的数据,对单词进行统计
package streaming;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.spark.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;

import scala.Tuple2;

import com.google.common.collect.Lists;
public class KafkaStreamingWordCount {

    public static void main(String[] args) {
        //设置匹配模式,以空格分隔
        final Pattern SPACE = Pattern.compile(" ");
        //接收数据的地址和端口
        String zkQuorum = "localhost:2181";
        //话题所在的组
        String group = "1";
        //话题名称以“,”分隔
        String topics = "top1,top2";
        //每个话题的分片数
        int numThreads = 2;
        SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]");
        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
//        jssc.checkpoint("checkpoint"); //设置检查点
        //存放话题跟分片的映射关系
        Map<String, Integer> topicmap = new HashMap<>();
        String[] topicsArr = topics.split(",");
        int n = topicsArr.length;
        for(int i=0;i<n;i++){
            topicmap.put(topicsArr[i], numThreads);
        }
        //从Kafka中获取数据转换成RDD
        JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
        //从话题中过滤所需数据
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {

            @Override
            public Iterable<String> call(Tuple2<String, String> arg0)
                    throws Exception {
                return Lists.newArrayList(SPACE.split(arg0._2));
            }
        });
        //对其中的单词进行统计
        JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
              new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String s) {
                  return new Tuple2<String, Integer>(s, 1);
                }
              }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer i1, Integer i2) {
                  return i1 + i2;
                }
              });
        //打印结果
        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();

    }

}
时间: 2024-10-12 13:13:42

spark streaming 接收 kafka 数据java代码WordCount示例的相关文章

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

官网文档:<http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example> Spark Streaming官网的例子reduceByKeyAndWindow 简单的介绍了spark streaming接收socket流的数据,并把接收到的数据进行windows窗口函数对数据进行批量处理. import java.util.Arrays; import org.apache.spark.S

Spark Streaming 读取 Kafka 数据的两种方式

在Spark1.3之前,默认的Spark接收Kafka数据的方式是基于Receiver的,在这之后的版本里,推出了Direct Approach,现在整理一下两种方式的异同. 1. Receiver-based Approach val kafkaStream = KafkaUtils.createDstream(ssc, [zk], [consumer group id], [per-topic,partitions] ) 2. Direct Approach (No Receivers) v

spark streaming 接收kafka消息之二 -- 运行在driver端的receiver

先从源码来深入理解一下 DirectKafkaInputDStream 的将 kafka 作为输入流时,如何确保 exactly-once 语义. val stream: InputDStream[(String, String, Long)] = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder, (String, String, Long)]( ssc, kafkaParams, fromO

spark streaming 接收kafka消息之一 -- 两种接收方式

源码分析的spark版本是1.6. 首先,先看一下 org.apache.spark.streaming.dstream.InputDStream 的 类说明: This is the abstract base class for all input streams. This class provides methods start() and stop() which is called by Spark Streaming system to start and stop receivi

Spark Streaming、Kafka结合Spark JDBC External DataSouces处理案例

场景:使用Spark Streaming接收Kafka发送过来的数据与关系型数据库中的表进行相关的查询操作: Kafka发送过来的数据格式为:id.name.cityId,分隔符为tab 1 zhangsan 1 2 lisi 1 3 wangwu 2 4 zhaoliu 3 MySQL的表city结构为:id int, name varchar 1 bj 2 sz 3 sh 本案例的结果为:select s.id, s.name, s.cityId, c.name from student s

Spark Streaming和Kafka整合保证数据零丢失

当我们正确地部署好Spark Streaming,我们就可以使用Spark Streaming提供的零数据丢失机制.为了体验这个关键的特性,你需要满足以下几个先决条件: 1.输入的数据来自可靠的数据源和可靠的接收器: 2.应用程序的metadata被application的driver持久化了(checkpointed ); 3.启用了WAL特性(Write ahead log). 下面我将简单地介绍这些先决条件. 可靠的数据源和可靠的接收器 对于一些输入数据源(比如Kafka),Spark S

Spark Streaming使用Kafka保证数据零丢失

来自: https://community.qingcloud.com/topic/344/spark-streaming使用kafka保证数据零丢失 spark streaming从1.2开始提供了数据的零丢失,想享受这个特性,需要满足如下条件: 数据输入需要可靠的sources和可靠的receivers 应用metadata必须通过应用driver checkpoint WAL(write ahead log) 可靠的sources和receivers spark streaming可以通过

【转】Spark Streaming和Kafka整合开发指南

基于Receivers的方法 这个方法使用了Receivers来接收数据.Receivers的实现使用到Kafka高层次的消费者API.对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据. 然而,在默认的配置下,这种方法在失败的情况下会丢失数据,为了保证零数据丢失,你可以在Spark Streaming中使用WAL日志,这是在Spark 1.2.0才引入的功能,这使得我们可以将接收到的数据保存到WA

Spark 系列(十六)—— Spark Streaming 整合 Kafka

一.版本说明 Spark 针对 Kafka 的不同版本,提供了两套整合方案:spark-streaming-kafka-0-8 和 spark-streaming-kafka-0-10,其主要区别如下: spark-streaming-kafka-0-8 spark-streaming-kafka-0-10 Kafka 版本 0.8.2.1 or higher 0.10.0 or higher AP 状态 Deprecated从 Spark 2.3.0 版本开始,Kafka 0.8 支持已被弃用