Spark Streaming官方文档学习--下

Accumulators and Broadcast Variables

这些不能从checkpoint重新恢复

如果想启动检查点的时候使用这两个变量,就需要创建这写变量的懒惰的singleton实例。

下面是一个例子:

  1. def getWordBlacklist(sparkContext):
  2. if (‘wordBlacklist‘ not in globals()):
  3. globals()[‘wordBlacklist‘] = sparkContext.broadcast(["a", "b", "c"])
  4. return globals()[‘wordBlacklist‘]
  5. def getDroppedWordsCounter(sparkContext):
  6. if (‘droppedWordsCounter‘ not in globals()):
  7. globals()[‘droppedWordsCounter‘] = sparkContext.accumulator(0)
  8. return globals()[‘droppedWordsCounter‘]
  9. def echo(time, rdd):
  10. # Get or register the blacklist Broadcast
  11. blacklist = getWordBlacklist(rdd.context)
  12. # Get or register the droppedWordsCounter Accumulator
  13. droppedWordsCounter = getDroppedWordsCounter(rdd.context)
  14. # Use blacklist to drop words and use droppedWordsCounter to count them
  15. def filterFunc(wordCount):
  16. if wordCount[0] in blacklist.value:
  17. droppedWordsCounter.add(wordCount[1])
  18. False
  19. else:
  20. True
  21. counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect())
  22. wordCounts.foreachRDD(echo)

DataFrame and SQL Operations

通过创建SparkSession的懒惰的singleton实例,可以从失败中恢复。

  1. # Lazily instantiated global instance of SparkSession
  2. def getSparkSessionInstance(sparkConf):
  3. if (‘sparkSessionSingletonInstance‘ not in globals()):
  4. globals()[‘sparkSessionSingletonInstance‘] = SparkSession\
  5. .builder\
  6. .config(conf=sparkConf)\
  7. .getOrCreate()
  8. return globals()[‘sparkSessionSingletonInstance‘]
  9. ...
  10. # DataFrame operations inside your streaming program
  11. words = ... # DStream of strings
  12. def process(time, rdd):
  13. print("========= %s =========" % str(time))
  14. try:
  15. # Get the singleton instance of SparkSession
  16. spark = getSparkSessionInstance(rdd.context.getConf())
  17. # Convert RDD[String] to RDD[Row] to DataFrame
  18. rowRdd = rdd.map(lambda w: Row(word=w))
  19. wordsDataFrame = spark.createDataFrame(rowRdd)
  20. # Creates a temporary view using the DataFrame
  21. wordsDataFrame.createOrReplaceTempView("words")
  22. # Do word count on table using SQL and print it
  23. wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word")
  24. wordCountsDataFrame.show()
  25. except:
  26. pass
  27. words.foreachRDD(process)

MLlib Operations

streaming machine learning algorithms (e.g. Streaming Linear Regression, Streaming KMeans, etc.)

can simultaneously learn from the streaming data as well as apply the model on the streaming data

for a much larger class of machine learning algorithms

you can learn a learning model offline (i.e. using historical data) and then apply the model online on streaming data

Caching / Persistence

DStreams also allow developers to persist the stream’s data in memory

using the persist() method on a DStream will automatically persist every RDD of that DStream in memory

For window-based operations like reduceByWindow and reduceByKeyAndWindow and state-based operations like updateStateByKey, this is implicitly true(对这些操作,默认实现自动的缓存)

For input streams that receive data over the network (such as, Kafka, Flume, sockets, etc.), the default persistence level is set to replicate the data to two nodes for fault-tolerance.(网络数据,默认存两份来容错)

You can mark an RDD to be persisted using the persist() or cache() methods on it.

levels are set by passing a StorageLevel object to persist()

The cache() method is a shorthand for using the default storage level, which is StorageLevel.MEMORY_ONLY

unlike RDDs, the default persistence level of DStreams keeps the data serialized in memory

Checkpointing

Spark Streaming needs to checkpoint enough information to a fault- tolerant storage system such that it can recover from failures,There are two types of data that are checkpointed.

  1. Metadata checkpointing - Saving of the information defining the streaming computation to fault-tolerant storage like HDFS. This is used to recover from failure of the node running the driver of the streaming application. Metadata includes:

    Configuration - The configuration that was used to create the streaming application.

    DStream operations - The set of DStream operations that define the streaming application.

    Incomplete batches - Batches whose jobs are queued but have not completed yet.

  2. Data checkpointing - Saving of the generated RDDs to reliable storage. 

When to enable Checkpointing

  1. Usage of stateful transformations - If either updateStateByKey or reduceByKeyAndWindow (with inverse function) is used in the application, then the checkpoint directory must be provided to allow for periodic(周期的) RDD checkpointing.
  2. Recovering from failures of the driver running the application - Metadata checkpoints are used to recover with progress information.

How to configure Checkpointing

Checkpointing can be enabled by setting a directory in a fault-tolerant, reliable file system (e.g., HDFS, S3, etc.) to which the checkpoint information will be saved

done by using streamingContext.checkpoint(checkpointDirectory)

if you want to make the application recover from driver failures, you should rewrite your streaming application to have the following behavior:

  1. When the program is being started for the first time, it will create a new StreamingContext, set up all the streams and then call start().
  2. When the program is being restarted after failure, it will re-create a StreamingContext from the checkpoint data in the checkpoint directory.

This behavior is made simple by using StreamingContext.getOrCreate

  1. # Function to create and setup a new StreamingContext
  2. def functionToCreateContext():
  3. sc = SparkContext(...) # new context
  4. ssc = new StreamingContext(...)
  5. lines = ssc.socketTextStream(...) # create DStreams
  6. ...
  7. ssc.checkpoint(checkpointDirectory) # set checkpoint directory
  8. return ssc
  9. # Get StreamingContext from checkpoint data or create a new one
  10. context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
  11. # Do additional setup on context that needs to be done,
  12. # irrespective of whether it is being started or restarted
  13. context. ...
  14. # Start the context
  15. context.start()
  16. context.awaitTermination()

如果checkpointDirectory存在,会从检查点重新新建

如果路径不存在,函数functionToCreateContext会创建新的context

You can also explicitly(明确的) create a StreamingContext from the checkpoint data and start the computation by using

  1. StreamingContext.getOrCreate(checkpointDirectory, None).

In addition to using getOrCreate one also needs to ensure that the driver process gets restarted automatically on failure,This is further discussed in the Deployment section

At small batch sizes (say 1 second), checkpointing every batch may significantly reduce operation throughput。

he default interval is a multiple of the batch interval that is at least 10 seconds

It can be set by using dstream.checkpoint(checkpointInterval)

Typically, a checkpoint interval of 5 - 10 sliding intervals of a DStream is a good setting to try.

Deploying Applications

Requirements

  • Cluster with a cluster manager
  • Package the application JAR
    If you are using spark-submit to start the application, then you will not need to provide Spark and Spark Streaming in the JAR. However, if your application uses advanced sources (e.g. Kafka, Flume), then you will have to package the extra artifact they link to, along with their dependencies, in the JAR that is used to deploy the application.
  • Configuring sufficient memory for the executors
    Note that if you are doing 10 minute window operations, the system has to keep at least last 10 minutes of data in memory. So the memory requirements for the application depends on the operations used in it.
  • Configuring checkpointing
  • Configuring automatic restart of the application driver
    • Spark Standalone 
      the Standalone cluster manager can be instructed to supervise the driver, and relaunch it if the driver fails either due to non-zero exit code, or due to failure of the node running the driver.
    • YARN automatically restarting an application
    • Mesos  Marathon has been used to achieve this with Mesos
  • Configuring write ahead logs
     If enabled, all the data received from a receiver gets written into a write ahead log in the configuration checkpoint directory.
  • Setting the max receiving rate

Upgrading Application Code

两种机制去更新代码

  1. 更新的应用和旧的应用并行的执行,Once the new one (receiving the same data as the old one) has been warmed up and is ready for prime time, the old one be can be brought down.这要求,数据源可以向两个地方发送数据。
  2. 优雅的停止,就是处理完接受到的数据之后再停止。ensure data that has been received is completely processed before shutdown。Then the upgraded application can be started, which will start processing from the same point where the earlier application left off.为了实现这个需要数据源的数据是可以缓存的。


Monitoring Applications

http://localhost:4040/streaming/


Performance Tuning

目的或者方式

  1. Reducing the processing time of each batch of data by efficiently using cluster resources.
  2. Setting the right batch size such that the batches of data can be processed as fast as they are received (that is, data processing keeps up with the data ingestion).

Level of Parallelism in Data Receiving

Level of Parallelism in Data Processing

来自为知笔记(Wiz)

时间: 2024-08-27 15:45:16

Spark Streaming官方文档学习--下的相关文章

Spring 4 官方文档学习(十二)View技术

1.介绍 Spring 有很多优越的地方,其中一个就是将view技术与MVC框架的其他部分相隔离.例如,在JSP存在的情况下使用Groovy Markup Templates 还是使用Thymeleaf,仅仅是一个配置问题. 本章覆盖了主要的view技术,嗯嗯,可以与Spring结合的那些,并简明的说明了如何增加新的view技术. 本章假定你已经熟悉了Spring 4 官方文档学习(十一)Web MVC 框架之resolving views 解析视图 -- 它覆盖了views如何耦合到MVC框架

Spring 4 官方文档学习(十一)Web MVC 框架之resolving views 解析视图

接前面的Spring 4 官方文档学习(十一)Web MVC 框架,那篇太长,故另起一篇. 针对web应用的所有的MVC框架,都会提供一种呈现views的方式.Spring提供了view resolvers,可以让你在浏览器中render model,而不必绑定到某种特定的view技术上.开箱即用,例如,Spring可以让你使用JSPs.Velocity目标和XSLT views.See Chapter 23, View technologies for a discussion of how

根据ThinkPHP官方文档学习opensns框架

根据ThinkPHP官方文档学习opensns框架 1.解读Application下各个Controller文件夹下的作用 控制器类的命名方式是:控制器名(驼峰法,首字母大写)+Controller 控制器文件的命名方式是:类名+class.php(类文件后缀) namespace Weibo\Controller; ///这是系统的规范要求,表示当前类是weibo模块下的控制器类,与实际路径一致 use Think\Controller; //引入 Think\Controller 类库便于直

React官方文档学习记录(四)- 条件渲染

一点点记录,建议需要学习React的移步官方文档去学习. 在React中,你可以创建一个清晰(distinct)的组件来简要描述你现在需要的东西.然后,你只需要使用你应用中的state来渲染它们. React中的条件型渲染跟JavaScript中的条件运算符运行方式差不多.好像就是使用JavaScript中的if或者三元运算符创建元素来显示现在的状态,然后让React更新UI来匹配这些修改. 下面这个例子就是根据不同的isLoggedIn进行不同的欢迎. 1 2 3 4 5 6 7 8 9 10

Python3.5.2官方文档学习备忘录

网址:https://docs.python.org/3/ 虽然学习官方文档有些耗时,不过看最原版的还是感觉好一点,原汁原味没有曲解没有省略. 从命令行向Python传递参数,运行:python - abc def import sys sys.argv 在命令行下的显示结果:['-','abc','def'] 在命令行中运行python,_ 变量会保存上一次运行的结果 >>> tax = 12.5 / 100 >>> price = 100.50 >>&g

Spring 4 官方文档学习(五)核心技术之SpEL

1.介绍 SpEL支持在runtime 查询.操作对象图. 2.功能概览 英文 中文 Literal expressions 字面值表达式 Boolean and relational operators 布尔和关系操作符 Regular expressions  正则表达式 Class expressions 类表达式 Accessing properties, arrays, lists, maps 访问properties.arrays.lists.maps Method invocati

Spring 4 官方文档学习(十一)Web MVC 框架之配置Spring MVC

在前面的文档中讲解了Spring MVC的特殊beans,以及DispatcherServlet使用的默认实现.在本部分,你会学习两种额外的方式来配置Spring MVC.分别是:MVC Java config 和  MVC XML namespace. 原文: Section 22.2.1, "Special Bean Types In the WebApplicationContext" and Section 22.2.2, "Default DispatcherSer

Spring Framework 官方文档学习(四)之Validation、Data Binding、Type Conversion

前言 在Spring Framework官方文档中,这三者是放到一起讲的,但没有解释为什么放到一起.大概是默认了读者都是有相关经验的人,但事实并非如此,例如我.好在闷着头看了一遍,又查资料又敲代码,总算明白了. 其实说穿了一文不值,我们用一个例子来解释: 假定,现有一个app,功能是接收你输入的生日,然后显示你的年龄.看起来app只要用当前日期减去你输入的日期就是年龄,应该很简单对吧?可惜事实不是这样的. 这里面有三个问题: 问题一:我们输入的永远是字符串,字符串需要转成日期格式才能被我们的ap

Spring Framework 官方文档学习心得

到目前为止,已经看了一百页,感受良多.再次感慨下,如果想使用,那可以看视频或者找例子,但如果想深入理解,最好还是看官方文档. 其一,对容器有了新的认识. 第二,对lifecycle有了新的认识. 第三,对版本演化有了认识. 第四,种种功能,各司其职. 以上只是泛泛而言,稍后在本文总结一下.初步设想是把一些基本接口的功能.层次以及彼此的关系罗列一下.同时兼顾版本的演化,简述下相应功能的历史,最好是画一张图. 以BeanPostProcessor接口为例,当configuration metadat