Flink 原理(六)——异步I/O(asynchronous I/O)

1、前言

  本文是基于Flink官网上Asynchronous  I/O的介绍结合自己的理解写成的,若有不正确的欢迎大伙留言交流,谢谢

2、Asynchronous  I/O简介

  将Flink用于流计算时,若涉及到和外部系统进行交互,如利用Flink从数据库中读取数据,这种需要获取I/O的场景时,我们需要考虑交互所带来的时延问题。

  为分析如何减少时延,我们先来分析一下,Flink以同步的形式方法外部系统(以MapFunction中和数据库交互为例)的过程,若图1虚线左侧所示,请求a发送到database后,MapFunction等待回复后才进行下发送下一个请求b,期间,I/O处于空闲状态,请求b又开始重复此过程,这样在两个来回的时间内(发送请求-收到结果为一个来回),只处理两个请求。如图1虚线右侧所示,同样是在两个来回的时间内,以异步的形式进行交互,请求a发出去后,在等待回复时,请求b,c,d依次发出,这样既可以处理4个请求了。

图1 同/异访问数据库方式对比图(Ref[1])

  在某些场景下,为了提高系统的吞吐能力,可以仅通过增大MapFunction的并发度以达目的,但是随之而来是资源的大量消耗。

  【重要事项

  1)为了实现以异步I/O访问数据库或K/V存储,数据库等需要有能支持异步请求的client;若是没有,可以通过创建多个同步的client并使用线程池处理同步call的方式实现类似并发的client,但是这方式没有异步I/O的性能好。

  2)AsyncFunction不是以多线程方式调用的,一个AsyncFunction实例按顺序为每个独立消息发送请求;

  3)目前(Flink 1.9),使用AsyncWaitOperator时要打断operator chain(默认也是不使用),原因见FLINK-13063

3、结果的顺序

  由于请求响应的快慢可能不一样,AsyncFunction的“并发”请求可能导致结果的乱序 。如图1中虚线右侧所示,若请求b发出之后,其结果在请求a的之前返回,这样异步I/O算子前后的消息顺序就不一致了。为了控制结果的返回顺序,Flink提供了两种模式:

  1)Unordered:当异步的请求完成时,其结果立马返回,不考虑结果顺序即乱序模式。当以processing time作为时间属性时,该模式可以获得最小的延时和最小的开销,使用方式:AsyncDataStream.unorderedWait(...);

  2)Ordered:该模式下,消息在异步I/O算子前后的顺序一致,先请求的先返回,即有序模式。为实现有序模式,算子将请求返回的结果放入缓存,直到该请求之前的结果全部返回或超时。该模式通常情况下回引入额外的时延以及在checkpoint过程中会带来开销,这是因为,和无序模式相比,消息和请求返回的结果都会在checkpoint的状态中维持更长时间。使用方式:AsyncDataStream.orderedWait(...);

  在此,我们需要针对流任务和event time相结合的情况进行补充说明。为什么?是因为watermark和消息的整体相对位置是不会变的,什么意思了?发生在某个watermark之后的消息,只能在watermark被发出之后发出,其请求结果也是。换句话说,两个watermark之间的消息整体与watermark的有序的。当然这个区间内消息之间是否有序这得根据使用的模式来分析。

  1)对Ordered模式,因为消息本身是有序的,所以watermark和消息之间也是有序的,和processing time相比,其不需要引入额外的开销;

  2)对Unordered模式,其模式是先响应先返回,但在与event time结合的情况里,消息或结果都需在特定watermark发出之后才能发出,此时,就会引入延时和开销,其开销的大小取决于watermark的频率,其原因参加下文原理部分。

4、原理

  4.1 Terms

  为更加详细的说明异步I/O的实现过程,先说明几个term,其中也会涉及其基本用法,若分析原理只看其含义即可。

  1)AsyncFunction:异步I/O的触发接口

    AsyncFunction在AsyncWaitOperator中作为一个用户函数,类似FlatMap,有open()/processElement(StreamRecord< in > record)/processWatermark(Watermark mark)方法。
 对于用户自己实现的AsyncFunction,必须重写asyncInvoke(IN input, AsyncCollector collector)来提供调用异步操作的代码。

  2)AsyncWaitOperator:调用AsyncFunction的流算子,是个抽象的概念,具体算子是unorderedWait(...)或orderedWait(...)

  3)AsyncCollector:

    AsyncCollector由AsyncWaitOperator创建,并传递给AsyncFunction,在这里它应该被添加到用户的回调函数中。它充当从用户代码中获取结果或错误的角色,并通知AsyncCollectorBuffer发出结果。

  4)AsyncCollectorBuffer:AsyncCollectorBuffer保存所有的AsyncCollector,并将结果发送给下一个节点。

  上述概念是工作示意图可参见Ref[2]

  4.2 架构图

  在流式计算中,涉及异步I/O的整体过程图如下:

图2 异步I/O架构图(Ref[2])

  1)消息达到AsyncWaitOperator后正常处理过程如下:

  AsyncWaitOperator调用AsyncFunction,并创建AsyncCollector传递给AsyncFunction。AsyncCollector等待获取到返回结果(异常)之后将入到AsyncCollectorBuffer保存时,会将一条mark消息放入AsyncCollectorBuffer中,然后一个signal信息将会发送到Emitter 线程,若此时是将消息发送出去的signal,则会将消息发送出去并通知task thread加消息到collector buffer中。至于怎么发要依据代码中设置的模式是有序还是无序,若是有序则发head,删head。该过程的更详细过程如下图:

图3 异步I/O正常处理消息图(Ref[2])

  2)checkpoint过程

  AsyncWaitOperator先是对AsyncCollectorBuffer中所有的输入流数据进行扫描,完成后就删除state中老的数据,然后将AsyncCollectorBuffer中数据存入到state中,而不是在处理时对单个输入流一个接一个的存入state,具体过程图见图2或图4。

  3)故障恢复

  在恢复AsyncWaitOperator的状态时,AsyncWaitOperator将scan状态中的所有元素,获取AsyncCollectors,调用AsyncFunction.asyncInvoke()并将它们插入AsyncCollectorBuffer中,具体的如下:

图4 故障恢复和checkpoint流程图(Ref[2])

总结:

  关于具体使用的方法见后期的博客,建议大伙看看原文,一千个读者就有一千个哈姆雷特!

Ref

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/asyncio.html

[2]https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673

[3]https://blog.icocoro.me/2019/05/26/1905-apache-flinkv2-asyncio/

原文地址:https://www.cnblogs.com/love-yh/p/11681435.html

时间: 2024-11-03 10:46:17

Flink 原理(六)——异步I/O(asynchronous I/O)的相关文章

异步编程(Asynchronous Programming)

异步编程与我们所看过的其他并行编程形式有一些不同,讨论的其他主题可以有大量的线程并行运行,可以完全利用系统中可用的处理器:而在异步编程中,需要避免阻塞线程,我们在这一章的第一节"线程.内存.锁定和阻塞"中已经对阻塞线程的概念有所了解了.阻塞的线程是不能工作的线程,因为它需要等待其他任务的完成:线程等待的通常任务是操作系统执行的输入输出,但有时也可能是等待锁,因此会进行临界区.线程是相对昂贵的资源,每个线程分配 1 MB 的堆(stack),以及操作系统内核为管理大量线程而产生的其他相关

How Javascript works (Javascript工作原理) (六) WebAssembly 对比 JavaScript 及其使用场景

个人总结: 1.webassembly简介:WebAssembly是一种用于开发网络应用的高效,底层的字节码.允许在网络应用中使用除JavaScript的语言以外的语言(比如C,C++,Rust及其他)来编写应用程序,然后编译成(提早)WebAssembly. 这是 JavaScript 工作原理的第六章. 现在,我们将会剖析 WebAssembly 的工作原理,而最重要的是它和 JavaScript 在性能方面的比对:加载时间,执行速度,垃圾回收,内存使用,平台 API 访问,调试,多线程以及

C#的多线程——使用async和await来完成异步编程(Asynchronous Programming with async and await)

https://msdn.microsoft.com/zh-cn/library/mt674882.aspx 侵删 更新于:2015年6月20日 欲获得最新的Visual Studio 2017 RC文档,参考Visual Studio 2017 RC Documentation. 使用异步编程,你可以避免性能瓶颈和提升总体相应效率.然而,传统的异步方法代码的编写方式比较复杂,导致它很难编写,调试和维护. Visual Studio 2012引入了一个简单的异步编程的方法,依赖.NET Fram

理论铺垫:阻塞IO、非阻塞IO、IO多路复用/事件驱动IO(单线程高并发原理)、异步IO

完全来自:http://www.cnblogs.com/alex3714/articles/5876749.html 同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不同的人在不同的上下文下给出的答案是不同的.所以先限定一下本文的上下文. 本文讨论的背景是Linux环境下的network IO. 一 概念说明 在进行解释之前,首先要说明几个概念:- 用户空间和内核空间- 进程切换- 进程的阻塞- 文件描述符- 缓存 I/O 用户空间与内核空间 现在操作系统都是采用虚拟存储器,

转战物联网&#183;基础篇05-通俗理解MQTT协议的实现原理和异步方式

??网络上搜索MQTT协议,会出现太多的解释,这里就不做官方标准释义的复制了.这一节我们从实战理解角度,通俗的将MQTT协议的作用及实现原理说一下,旨在可以快速理解MQTT协议.所以可能会出现很多看似不标准的解释,但是更容易理解MQTT的内涵,对MQTT十分精通者请忽略此文. ??在物联网项目中,经常出现的要求是"有限环境".什么意思呢,通俗说就是网络可能不太稳定,带宽也可能很小,网速也比较低,硬件MCU性能也很低,要求在这种情况下也能可靠联网传输信息.看到这里大家就会想到我前面提到的

mysql主从原理(异步复制)

主从复制作用:数据分布(异机或异地)---负载均衡----备份----高可用和故障转移----升级测试. 主从原理:   线程介绍: 主服务器的一个工作线程: DUMP线程,作用:接收到从库发来的请求后,负责给slave服务器发送二进制日志 从服务器的两个工作线程: I/O线程: 作用:负责读取主服务器的二进制日志,并将其保存到自己的中继日志文件中. SQL线程: 作用:来复制执行中继日志. 注意从库的IO线程和SQL线程是分开的,互不影响. 第1步: slave发送请求:(从库IO线程负责)

Flink原理(二)——资源

前言 本文主要是想简要说明Flink在集群部署.任务提交.任务运行过程中资源情况,若表述有误欢迎大伙留言分享,非常感谢! 一.集群部署阶段 集群部署这里指的是Flink standalone模式,因为在Yarn模式(包括session.single job模式)是可以仅通过Flink client提交任务到Yarn上,所以是否手动部署Flink集群对任务的执行是没有影响的.下图[1]是简单的Flink的集群构成情况,包括一个master(JobManager).两个worker(TaskMana

async异步注解和aspect切面注解等注解的原理

在我们使用spring框架的过程中,在很多时候我们会使用@async注解来异步执行某一些方法,提高系统的执行效率.今天我们来探讨下spring是如何完成这个功能的. 1.spring 在扫描bean的时候会扫描方法上是否包含@async的注解,如果包含的,spring会为这个bean动态的生成一个子类,我们称之为代理类(?). 2.代理类是继承我们所写的bean的,然后把代理类注入进来,在执行此方法时会到代理类中,代理类判断此方法需要异步执行,就不会调用父类 (我们原本写的bean)的对应方法.

定时器运行原理 &amp;&amp; javascript事件循环模型

定时器是我们经常使用的一个异步函数,它的用处十分广泛,比如图片轮播.各种小的动画.延时操作等等:定时器函数只有两个setTimeout.setInterval,这两个工作原理相同,唯一的区别是:setTimeout只执行一次,setInterval循环执行:通过以下实例看看对定时器原理掌握程度: 定时器3个实例 首先声明这三个实例输出皆不同,先思考输出结果,以及为何不同 实例一: console.log('test1') for(var i=0;i<10;i++){ setTimeout(()=