java spark-streaming接收TCP/Kafka数据

本文将展示

1、如何使用spark-streaming接入TCP数据并进行过滤;

2、如何使用spark-streaming接入TCP数据并进行wordcount;

内容如下:

1、使用maven,先解决pom依赖

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.10</artifactId>
            <version>1.6.0</version>
            <scope>provided</scope>
        </dependency>

1、接收TCP数据并过滤,打印含有error的行

package com.xiaoju.dqa.realtime_streaming;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Durations;

//nc -lk 9999
public class SparkStreamingTCP {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local").setAppName("streaming word count");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        JavaDStream<String> lines = jssc.socketTextStream("10.93.21.21", 9999);
        JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                return s.contains("error");
            }
        });
        errorLines.print();
        jssc.start();
        jssc.awaitTermination();
    }
}

执行方法

$ spark-submit realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar# 另起一个窗口$ nc -lk 9999# 输入数据

2、接收Kafka数据并进行计数(WordCount)

package com.xiaoju.dqa.realtime_streaming;

import java.util.*;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.apache.spark.streaming.Durations;

import scala.Tuple2;

// bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
public class SparkStreamingKafka {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setMaster("yarn-client").setAppName("streaming word count");
        //String topic = "offline_log_metrics";
        String topic = "test";
        int part = 1;
        JavaSparkContext sc = new JavaSparkContext(conf);
        sc.setLogLevel("WARN");
        JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
        Map<String ,Integer> topicMap = new HashMap<String, Integer>();
        String[] topics = topic.split(";");
        for (int i=0; i<topics.length; i++) {
            topicMap.put(topics[i], 1);
        }
        List<JavaPairReceiverInputDStream<String, String>> list = new ArrayList<JavaPairReceiverInputDStream<String, String>>();
        for (int i = 0; i < part; i++) {
            list.add(KafkaUtils.createStream(jssc,
                    "10.93.21.21:2181",
                    "bigdata_qa",
                    topicMap));
        }
        JavaPairDStream<String, String> wordCountLines = list.get(0);
        for (int i = 1; i < list.size(); i++) {
            wordCountLines = wordCountLines.union(list.get(i));
        }
        JavaPairDStream<String, Integer> counts = wordCountLines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>(){
            @Override
            public Iterable<String> call(Tuple2<String, String> stringStringTuple2){
                List<String> list2 = null;
                try {
                    if ("".equals(stringStringTuple2._2) || stringStringTuple2._2 == null) {
                        System.out.println("_2 is null");
                        throw new Exception("_2 is null");
                    }
                    list2 = Arrays.asList(stringStringTuple2._2.split(" "));
                } catch (Exception ex) {
                    ex.printStackTrace();
                    System.out.println(ex.getMessage());
                }
                return list2;
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                Tuple2<String, Integer> tuple2 = null;
                try {
                    if (s==null || "".equals(s)) {
                        tuple2 = new Tuple2<String, Integer>(s, 0);
                        throw new Exception("s is null");
                    }
                    tuple2 = new Tuple2<String, Integer>(s, 1);
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
                return tuple2;
            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer x, Integer y) throws Exception {
                return x + y;
            }
        });
        counts.print();

        jssc.start();
        try {
            jssc.awaitTermination();
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            jssc.close();
        }
    }
}

执行方法

 $ spark-submit --queue=root.XXX realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar# 另开一个窗口,启动kafka生产者$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test# 输入数据
时间: 2024-10-08 06:30:18

java spark-streaming接收TCP/Kafka数据的相关文章

Kafka:ZK+Kafka+Spark Streaming集群环境搭建(二十二)Spark Streaming接收流数据及使用窗口函数

官网文档:<http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example> Spark Streaming官网的例子reduceByKeyAndWindow 简单的介绍了spark streaming接收socket流的数据,并把接收到的数据进行windows窗口函数对数据进行批量处理. import java.util.Arrays; import org.apache.spark.S

第88课:Spark Streaming从Flume Poll数据案例实战和内幕源码解密

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming

Spark Streaming连接TCP Socket

1.Spark Streaming是什么 Spark Streaming是在Spark上建立的可扩展的高吞吐量实时处理流数据的框架,数据可以是来自多种不同的源,例如kafka,Flume,Twitter,ZeroMQ或者TCP Socket等.在这个框架下,支持对流数据的各种运算,比如map,reduce,join等.处理过后的数据可以存储到文件系统或数据库. 利用Spark Streaming,你可以使用与批量加载数据相同的API来创建数据管道,并通过数据管道处理流式数据.此外,Spark S

spark streaming 接收 kafka 数据java代码WordCount示例

1. 首先启动zookeeper 2. 启动kafka 3. 核心代码 生产者生产消息的java代码,生成要统计的单词 package streaming; import java.util.Properties; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; public class MyProducer { pu

spark streaming 接收kafka消息之一 -- 两种接收方式

源码分析的spark版本是1.6. 首先,先看一下 org.apache.spark.streaming.dstream.InputDStream 的 类说明: This is the abstract base class for all input streams. This class provides methods start() and stop() which is called by Spark Streaming system to start and stop receivi

spark streaming 接收kafka消息之二 -- 运行在driver端的receiver

先从源码来深入理解一下 DirectKafkaInputDStream 的将 kafka 作为输入流时,如何确保 exactly-once 语义. val stream: InputDStream[(String, String, Long)] = KafkaUtils.createDirectStream [String, String, StringDecoder, StringDecoder, (String, String, Long)]( ssc, kafkaParams, fromO

kafka + spark Streaming + Tranquility Server发送数据到druid

花了很长时间尝试druid官网上说的Tranquility嵌入代码进行实时发送数据到druid,结果失败了,各种各样的原因造成了失败,现在还没有找到原因,在IDEA中可以跑起,放到线上就死活不行,有成功了的同仁希望贴个链接供我来学习学习:后来又尝试了从kafka实时发送到druid,还是有些错误,感觉不太靠谱:最后没办法呀,使用Tranquility Server呗 _ _! Tranquility Server的配置和启动请移步:https://github.com/druid-io/tran

Spark Streaming的容错和数据无丢失机制

实时的流式处理系统必须是7*24运行的,同时可以从各种各样的系统错误中恢复,在设计之处,Spark Streaing就支持driver和worker节点的错误恢复.然后,在使用某些数据源的时候,错误恢复时输入数据可能会丢失.在spark 1.2中,加入write ahead logs(日志)这个初步方案用来改进恢复机制,保证数据的无丢失. 背景 spark和rdd的设计保证了集群中worker节点的容错性.spark streaming构建在spark之上,所以它的worker节点也是同样的容错

第88讲:Spark Streaming从Flume Poll数据

本节课分成二部分讲解: 一.Spark Streaming on Polling from Flume实战 二.Spark Streaming on Polling from Flume源码 第一部分: 推模式(Flume push SparkStreaming) VS 拉模式(SparkStreaming poll Flume) 采用推模式:推模式的理解就是Flume作为缓存,存有数据.监听对应端口,如果服务可以链接,就将数据push过去.(简单,耦合要低),缺点是SparkStreaming