spark streaming测试之二使用网络数据源

测试思路:

首先,创建网络数据源数据发送器(程序一);

其次,创建spark接收数据程序(程序二);

接着,将程序一打包,放在服务器上执行。这里有三个参数分别是:所要发送的数据文件,通过哪个端口号发送,每隔多少毫秒发送一次数据;

最后,运行spark程序,这里每隔5秒处理一次数据。有两个参数:监听的端口号,每隔多少毫秒接收一次数据。

观察效果。

程序一:

sparkStreaming

import java.io.PrintWriter
import java.net.ServerSocket

import scala.io.Source

object SalaSimulation {
  (length: ) = {
    java.util.Random
    rdm = Random

    rdm.nextInt(length)
  }

  (args: Array[]){
    (args.length != ){
      System..println()
      System.()
    }

    filename = args()
    lines = Source.(filename).getLines.toList
    filerow = lines.length

    listener = ServerSocket(args().toInt)
    (){
      socket = listener.accept()
      Thread(){
        = {
          (+socket.getInetAddress)
          out = PrintWriter(socket.getOutputStream())
          (){
            Thread.(args().toLong)
            content = lines((filerow))
            (content)
            out.write(content +)
            out.flush()
          }
          socket.close()
        }
      }.start()
    }
  }
}

程序二:

sparkStreaming

import org.apache.log4j.{LoggerLevel}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{SecondsStreamingContext}
import org.apache.spark.{SparkContextSparkConf}
import org.apache.spark.streaming.StreamingContext._

object NetworkWordCount {
  def main(args: Array[]){
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

    conf = SparkConf().setAppName().setMaster()
    sc = SparkContext(conf)
    ssc = StreamingContext(sc())

    lines = ssc.socketTextStream(args()args().toIntStorageLevel.)
    words = lines.flatMap(_.split())
    wordCounts = words.map(x=>(x)).reduceByKey(_+_)

    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
时间: 2024-10-13 10:56:48

spark streaming测试之二使用网络数据源的相关文章

Spark 定制版~Spark Streaming(二)

本讲内容: a. 解密Spark Streaming运行机制 b. 解密Spark Streaming架构 注:本讲内容基于Spark 1.6.1版本(在2016年5月来说是Spark最新版本)讲解. 上节回顾: 上节课谈到技术界的寻龙点穴,Spark就是大数据的龙脉,而Spark Streaming就是Spark的穴位.假如要构建一个强大的Spark应用程序 ,Spark Streaming 是一个值得借鉴的参考,Spark Streaming涉及多个job交叉配合,几乎可以包括spark的所

spark streaming测试之三有状态的接收数据

测试思路: 首先,使用上篇文章的程序一发送网络数据: 其次,运行spark程序,观察效果. 说明: 1. 这里使用到了更新函数: 2. 使用检查点来保证状态. sparkStreaming import org.apache.log4j.{LoggerLevel} import org.apache.spark.streaming.{SecondsStreamingContext} import org.apache.spark.{SparkContextSparkConf} import or

spark streaming测试之一使用本地数据源

直接上代码吧 说下测试思路: 该代码监控的/tmp/sparkStream/目录: 首先,创建该目录mkdir -p /tmp/sparkStream: 然后,运行spark程序: 最后,向监控目录/tmp/sparkStream/添加数据文件: 观察spark程序运行效果. sparkStreaming import org.apache.log4j.{LoggerLevel} import org.apache.spark.SparkConf import org.apache.spark.

通过案例对 spark streaming 透彻理解三板斧之二:spark streaming运行机制

本期内容: 1. Spark Streaming架构 2. Spark Streaming运行机制 Spark大数据分析框架的核心部件: spark Core.spark  Streaming流计算.GraphX图计算.MLlib机器学习.Spark SQL.Tachyon文件系统.SparkR计算引擎等主要部件. Spark Streaming 其实是构建在spark core之上的一个应用程序,要构建一个强大的Spark应用程序 ,spark  Streaming是一个值得借鉴的参考,spa

.Spark Streaming(上)--实时流计算Spark Streaming原理介

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍 http://www.cnblogs.com/shishanyuan/p/4747735.html 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP

7.Spark Streaming(上)--Spark Streaming原理介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处

整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管.本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中. 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,非常值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,比如HA策略: 通过Spark Contributor.Spark布道者陈超我

Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming介绍

[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送–Spark入门实战系列>获取 1 Spark Streaming简介 1.1 概述 Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的.具备容错机制的实时流数据的处理.支持从多种数据源获取数据,包括Kafk.Flume.Twitter.ZeroMQ.Kinesis 以及TCP sockets,从数据源获取数据之后,可以使用诸如map.reduce.join和window等高级函数进行复杂算法的处理

Spark Streaming发行版笔记15:no receivers彻底思考

数据接入Spark Streaming的二种方式:Receiver和no receivers方式 建议企业级采用no receivers方式开发Spark Streaming应用程序,好处: 1.更优秀的自由度控制 2.语义一致性 no receivers更符合数据读取和数据操作,Spark 计算框架底层有数据来源,如果只有direct直接操作数据来源则更天然.操作数据来源封装其一定是rdd级别的. 所以Spark 推出了自定义的rdd即Kafkardd,只是数据来源不同. 进入源码区: 注释基