SparkStreaming 的编程模型

依赖管理

基本套路

Dstream输入源 ---input DStream

Dstream输入源--- Receiver

内置的input Dstream : Basic Source

内置的input Dstream :Advanced Sources

Dstream 输入源: multiple input DStream

Dstream 输入源: Custom Receiver

官方参考网站 http://spark.apache.org/docs/1.6.2/streaming-custom-receivers.html

scala 参考模版

class CustomReceiver(host: String, port: Int)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      override def run() { receive() }
    }.start()
  }

  def onStop() {
   // There is nothing much to do as the thread calling receive()
   // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private def receive() {
    var socket: Socket = null
    var userInput: String = null
    try {
     // Connect to host:port
     socket = new Socket(host, port)

     // Until stopped or connection broken continue reading
     val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
     userInput = reader.readLine()
     while(!isStopped && userInput != null) {
       store(userInput)
       userInput = reader.readLine()
     }
     reader.close()
     socket.close()

     // Restart in an attempt to connect again when server is active again
     restart("Trying to connect again")
    } catch {
     case e: java.net.ConnectException =>
       // restart if could not connect to server
       restart("Error connecting to " + host + ":" + port, e)
     case t: Throwable =>
       // restart if there is any other error
       restart("Error receiving data", t)
    }
  }
}

java 参考模版

public class JavaCustomReceiver extends Receiver<String> {

  String host = null;
  int port = -1;

  public JavaCustomReceiver(String host_ , int port_) {
    super(StorageLevel.MEMORY_AND_DISK_2());
    host = host_;
    port = port_;
  }

  public void onStart() {
    // Start the thread that receives data over a connection
    new Thread()  {
      @Override public void run() {
        receive();
      }
    }.start();
  }

  public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself if isStopped() returns false
  }

  /** Create a socket connection and receive data until receiver is stopped */
  private void receive() {
    Socket socket = null;
    String userInput = null;

    try {
      // connect to the server
      socket = new Socket(host, port);

      BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));

      // Until stopped or connection broken continue reading
      while (!isStopped() && (userInput = reader.readLine()) != null) {
        System.out.println("Received data ‘" + userInput + "‘");
        store(userInput);
      }
      reader.close();
      socket.close();

      // Restart in an attempt to connect again when server is active again
      restart("Trying to connect again");
    } catch(ConnectException ce) {
      // restart if could not connect to server
      restart("Could not connect", ce);
    } catch(Throwable t) {
      // restart if there is any other error
      restart("Error receiving data", t);
    }
  }
}

无状态的转换操作

有状态的转换操作1-updateStateByKey

有状态的转换操作2-window

有状态的转换操作2-window普通规约与增量规约

理解增量规约

输出操作

Dstream输出

持久化操作

时间: 2024-10-17 07:42:50

SparkStreaming 的编程模型的相关文章

Spark机器学习:Spark 编程模型及快速入门

http://blog.csdn.net/pipisorry/article/details/52366356 Spark编程模型 SparkContext类和SparkConf类 我们可通过如下方式调用 SparkContext 的简单构造函数,以默认的参数值来创建相应的对象.val sc = new SparkContext("local[4]", "Test Spark App") 这段代码会创建一个4线程的 SparkContext 对象,并将其相应的任务命

Linux的I/O模式、事件驱动编程模型

大纲: (1)基础概念回顾 (2)Linux的I/O模式 (3)事件驱动编程模型 (4)select/poll/epoll的区别和Python示例 网络编程里常听到阻塞IO.非阻塞IO.同步IO.异步IO等概念,总听别人装13不如自己下来钻研一下.不过,搞清楚这些概念之前,还得先回顾一些基础的概念. 1.基础知识回顾 注意:咱们下面说的都是Linux环境下,跟Windows不一样哈~~~ 1.1 用户空间和内核空间 现在操作系统都采用虚拟寻址,处理器先产生一个虚拟地址,通过地址翻译成物理地址(内

Storm介绍及核心组件和编程模型

离线计算 离线计算:批量获取数据.批量传输数据.周期性批量计算数据.数据展示 代表技术:Sqoop批量导入数据.HDFS批量存储数据.MapReduce批量计算数据.Hive批量计算数据.azkaban/oozie任务调度 流式计算 流式计算:数据实时产生.数据实时传输.数据实时计算.实时展示 代表技术:Flume实时获取数据.Kafka/metaq实时数据存储.Storm/JStorm实时数据计算.Redis实时结果缓存.持久化存储(mysql). 一句话总结:将源源不断产生的数据实时收集并实

ARMV8 datasheet学习笔记4:AArch64系统级体系结构之编程模型(4)- 其它

1. 前言 2.可配置的指令使能/禁用控制和trap控制 指令使能/禁用 当指令被禁用,则这条指令就会变成未定义 指令Trap控制 控制某条或某些指令在运行时进入陷阱,进入陷阱的指令会产生trap异常,路由规则如下: (1)当前为EL1,则陷阱异常传递给EL1(HCR_EL2.TGE定义为1时,会路由到EL2); (2)当前为EL2,则陷阱异常传递给EL2; (3)当前为EL3,则陷阱异常传递给EL3; 3. 系统调用 SVC 默认情况下SVC产生supervisor call,同步异常目标级别

网络编程模型

课程索引 1. 编程模型 2. 编程模型 Socket的实质就是一个接口 , 利用该接口,用户在使用不同的网络协议时,操作函数得以统一. 而针对不同协以统一. 而针对不同协议的差异性操作,则交给了 socket去自行解决. 3. TCP编程模型 4. UDP编程模型

多线程编程模型

在学习muduo网络库前,应该先熟悉一下多线程网络服务编程模型.在6.6.2节介绍了11种方案.方案0到方案4用的是阻塞I/O.方案5到方案11用的都是阻塞I/O. 方案0: accept+read/write 方案0不是并发模型,只是一个循环处理.用代码表示的话,可以表示为: while(true) { int fd=accept(--); read(fd,--) or write(fd--); close(fd); } 一次只能处理一个连接,第一个连接处理完毕后,才可以进入下一次循环,否则阻

Java的多线程编程模型5--从AtomicInteger开始

Java的多线程编程模型5--从AtomicInteger开始 2011-06-23 20:50 11393人阅读 评论(9) 收藏 举报 java多线程编程jniinteger测试 AtomicInteger,一个提供原子操作的Integer的类.在Java语言中,++i和i++操作并不是线程安全的,在使用的时候,不可避免的会用到synchronized关键字.而AtomicInteger则通过一种线程安全的加减操作接口. 来看AtomicInteger提供的接口. //获取当前的值 publ

【收藏转】WCF后传系列(8):深度通道编程模型Part 1—设计篇

引言 从本质上说,WCF是一个通信服务框架,它允许我们使用不同的传输协议,使用不同的消息编码形式,跟不同的WS-*系列规范交互,而所有这些细节都是由通道堆栈来处理的.为了简化这些处理,在WCF中提供了两种模型,一是针对开发者的应用程序编程模型:二是用来通信的通道模型,这样对于开发者来说,只要了解应用程序编程模型就足够了,而不会涉及到通道模型,然而,对于通道模型进行必要的学习,可以让我们真正理解WCF中“通信”概念,了解WCF的 整个架构体系,从而构建出更加健壮的WCF服务或者对WCF框架进行扩展

【收藏转】WCF后传系列(9):深度通道编程模型Part 2—实例篇

引言 从本质上说,WCF是一个通信服务框架,它允许我们使用不同的传输协议,使用不同的消息编码形式,跟不同的WS-*系列规范交互,而所有这些细节都是由通道堆栈来处理的.在<WCF专题系列(8):深度通道编程模型Part 1—设计篇>中,对于WCF中的通道模型有了深入的认识,本文中,我将通过实例来说明在通道模型中,服务端是如何接收消息,客户端是如何发送消息的. 服务端通道 本文将不使用WCF的编程模型,而直接利用通道模型来进行通信,这样有助于我们更进一步加深对服务端处理消息的认识,在服务端侦听并接