Flink异步IO-实现原理(外文)

原文转至:https://docs.google.com/document/d/1Lr9UYXEz6s6R_3PWg3bZQLF3upGaNEkc0rQCFSzaYDI/edit#

        Asynchronous I/O Design and Implementation

Motivation

I/O access, for the most case, is a time-consuming process, making the TPS for single operator much lower than in-memory computing, particularly for streaming job, when low latency is a big concern for users. Starting multiple threads may be an option to handle this problem, but the drawbacks are obvious: The programming model for end users may become more complicated as they have to implement thread model in the operator. Furthermore, they have to pay attention to coordinate with checkpointing.

Scenario

For the ML streaming job, data flow has to get data from HBase, a data collection with billions of records, and then compute against it. The bottleneck for this job is the operator accessing HBase. Even though HBase has `been highly optimized, achieving very high QPS for the total cluster, but TPS for each subtask can not be very high due to slow I/O operation.

AsyncFunction

AsyncFunction works as a user function in AsyncWaitOperator, which looks like StreamFlatMap operator, having open()/processElement(StreamRecord<IN> record)/processWatermark(Watermark mark).

For user’s concrete AsyncFunction, the asyncInvoke(IN input, AsyncCollector<OUT> collector) has to be overriden to supply codes to start an async operation.

Interface

public interface AsyncFunction<IN, OUT> extends Function, Serializable {

/**

* Trigger async operation for each stream input.

* The AsyncCollector should be registered into async client.

*

* @param input Stream Input

* @param collector AsyncCollector

*/

void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;

}

Async Resource

Async resource refers to clients or connections used to carry out async operation. Using getting results from HBase as an example, async resource could be a hbase connection pool.

User can place async resource as a member variable inside AsyncFunction. If it is not serializable, using keyword transient is an option.

Interaction with AsyncWaitOperator

For each input stream record of AsyncWaitOperator, they will be processed by AsyncFunction.asyncInvoke(IN input, AsyncCollector<OUT> cb). Then AsyncCollector will be appended into AsyncCollectorBuffer. We will cover AsyncCollector and AsyncCollectorBuffer later.

AsyncCollector

AsyncCollector is created by AsyncWaitOperator, and passed into AsyncFunction, where it should be added into user’s callback. It acts as a role to get results or errors from user codes and notify the AsyncCollectorBuffer to emit results.

The functions specific for the user is the collect, and they should be called when async operation is done or errors are thrown out.

public class AsyncCollector<OUT> {
  private List<OUT> result;
  private Throwable error;
  private AsyncCollectorBuffer<OUT> buffer;

  /**
   * Set result
   * @param result A list of results.
   */
  public void collect(List<OUT> result) {
    this.result = result;
    buffer.mark(this);
  }

  /**
   * Set error
   * @param error A Throwable object.
   */
  public void collect(Throwable error) {
    this.error = error;
    buffer.mark(this);
  }

  /**
   * Get result. Throw RuntimeException while encountering an error.
   * @return A List of result.
   * @throws RuntimeException RuntimeException wrapping errors from user codes.
   */
  public List<OUT> getResult() throws RuntimeException { ... }
}

How is it used

Before calling AsyncFunction.asyncInvoke(IN input, AsyncCollector<OUT> collector), AsyncWaitOperator will try to get an instance of AsyncCollector from AsyncCollectorBuffer. Then it will be taken into user’s callback function. If the buffer is full, it will wait until some of ongoing callbacks has finished.

Once async operation has done, the AsyncCollector.collect() will take results or errors and AsyncCollectorBuffer will be notified.

AsyncCollector is implemented by FLINK.

AsyncCollectorBuffer

AsyncCollectorBuffer keeps all AsyncCollectors, and emit results to the next nodes.

When AsyncCollector.collect() is called, a mark will be placed in AsyncCollectorBuffer, indicating finished AsyncCollectors. A working thread, named Emitter, will also be signalled once a AsyncCollector gets results, and then try to emit results depending on the ordered or unordered setting.

For simplicity, we will refer task to AsycnCollector in the AsyncCollectorBuffer in the following text.

Ordered and Unordered

Based on the user configuration, the order of output elements will or will not be guaranteed. If not guaranteed, the finished AsyncCollectors coming later will be emitted earlier.

Emitter Thread

The Emitter Thread will wait for finished AsyncCollectors. When it is signalled, it will process tasks in the buffer as follow:

  • Ordered Mode

If the first task in the buffer is finished, then Emitter will collect its results, and then proceed to the second task. If the first task is not finished yet, just wait for it again.

  • Unordered Mode

Check all finished tasks in the buffer, and collect results from those tasks which are prior to the oldest Watermark in the buffer.

The Emitter Thread and Task Thread will access exclusively by acquiring/releasing the checkpoint lock from StreamTask.

Signal Task Thread when all tasks have finished to notify it that all data has been processed, and it is OK to close the operator.

Signal Task Thread after removing some tasks from the buffer.

Propagate Exceptions to Task Thread.

Task Thread

Access AsyncCollectorBuffer exclusively against the Emitter Thread.

Get and Add a new AsyncCollector to the buffer, wait while buffer is full.

Watermark

All watermarks will also be kept in AsyncCollectorBuffer. A watermark will be emitted if and only if after all AsyncCollectors coming before current watermark have been emitted.

Interface

public interface AsyncCollectorBuffer<IN, OUT> {
  /**
   * Add an AsyncCollector into the buffer.
   *
   * @param collector AsyncCollector
   * @throws Exception InterruptedException or exceptions from AsyncCollector.
   */
  void add(AsyncCollector<OUT> collector) throws Exception;

  /**
   * Notify the Emitter Thread that a AsyncCollector has completed.
   *
   * @param collector Completed AsyncCollector
   * @throws Exception InterruptedException.
   */
  void mark(AsyncCollector<OUT> collector) throws Exception;

  /**
   * Caller will wait here if buffer is not empty, meaning that not all tasks have returned yet.
   *
   * @throws Exception InterruptedException or Exceptions from AsyncCollector.
   */
  void waitEmpty() throws Exception;
}

State, Failover and Checkpoint

A new operator, named AsyncWaitOperator<IN, OUT>, is added to FLINK streaming. This operator will buffer all AsyncCollectors, sending the processed data to the following operators.

State and Checkpoint

All input StreamRecords will be kept in state. Instead of storing each input stream records into state one by one while processing, AsyncWaitOperator will put all input stream records in AsyncCollectorBuffer into state while snapshotting operator state, which will be cleared before persisting those records.

When all barriers have arrived at the operator, checkpoint can be carried out immediately.

Failover

While restoring the operator’s state, the operator will scan all elements in the state, get AsyncCollectors, call AsyncFunction.asyncInvoke() and insert them back into AsyncCollectorBuffer.

API

No modification to current DataStream class in FLINK. AsyncDataStream will handle adding AsyncWaitOperator.

AsyncDataStream

AsyncDataStream provides two methods to add AsyncWaitOperator with AsyncFunction into FLINK streaming job.

public class AsyncDataStream {
  /**
   * Add an AsyncWaitOperator. The order of output stream records may be reordered.
   *
   * @param func AsyncWaitFunction
   * @return A new DataStream.
   */
  public static DataStream<OUT> unorderedWait(DataStream<IN>, AsyncWaitFunction<IN, OUT> func);

  /**
   * Add an AsyncWaitOperator. The order of output stream records is guaranteed to be the same as input ones.
   *
   * @param func AsyncWaitFunction
   * @return A new DataStream.
   */
  public static DataStream<OUT> orderedWait(DataStream<IN>, AsyncWaitFunction<IN, OUT> func);
}

Error Handling

Exceptions can be propagated into framework code, causing task failover.

Notes

Async Resource Sharing

For the case to share async resources(like connection to hbase, netty connections) among different slots(task workers) in the same TaskManager(a.k.a the same JVM), we can make the connection static so that all threads in the same process can share the same instance.

Of course, please pay attention to thread safety while using those resources.

Example

For callback

public class HBaseAsyncFunction implements AsyncFunction<String, String> {
  // initialize it while reading object
  transient Connection connection;

  @Override
  public void asyncInvoke(String val, AsyncCollector<String> c) {
    Get get = new Get(Bytes.toBytes(val));
    Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));
    // UserCallback is from user’s async client.
    ((AsyncableHTableInterface) ht).asyncGet(get, new UserCallback(c));
  }
}

// create data stream
public void createHBaseAsyncTestStream(StreamExecutionEnvironment env) {
  DataStream<String> source = getDataStream(env);
  DataStream<String> stream = AsyncDataStream.unorderedWait(source, new HBaseAsyncFunction());
  stream.print();
}

For ListenableFuture

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ListenableFuture;

public class HBaseAsyncFunction implements AsyncFunction<String, String> {
  // initialize it while reading object
  transient Connection connection;

  @Override
  public void asyncInvoke(String val, AsyncCollector<String> c) {
    Get get = new Get(Bytes.toBytes(val));
    Table ht = connection.getTable(TableName.valueOf(Bytes.toBytes("test")));

    ListenableFuture<Result> future = ht.asyncGet(get);
    Futures.addCallback(future,
      new FutureCallback<Result>() {
        @Override public void onSuccess(Result result) {
          List ret = new ArrayList<String>();
          ret.add(result.get(...));
          c.collect(ret);
        }

        @Override public void onFailure(Throwable t) {
          c.collect(t);
        }
      },
      MoreExecutors.newDirectExecutorService()
    );
  }
}

 

原文地址:https://www.cnblogs.com/dklig/p/11586953.html

时间: 2024-10-08 02:06:12

Flink异步IO-实现原理(外文)的相关文章

javaNIO原理(含代码)及与 同步阻塞IO 、伪异步IO比较

一.同步阻塞IO BIO就是阻塞式的IO,网络通信中对于多客户端的连入,服务器端总是与客户端数量一致的线程去处理每个客户端任务,即,客户端与线程数1:1,并且进行读写操作室阻塞的,当有你成千上完的客户端进行连接,就导致服务器不断的建立新的线程,最后导致低通资源不足,后面的客户端不能连接服务器,并且连接入的客户端并不是总是在于服务器进行交互,很可能就只是占用着资源而已. 二.伪异步IO 伪异步IO对同步IO进行了优化,后端通过一个线程池和任务队列去处理所有客户端的请求,当用完后在归还给线程池,线程

Windows内核原理-同步IO与异步IO

目录 Windows内核原理-同步IO与异步IO 背景 目的 I/O 同步I/O 异步I/O I/O完成通知 总结 参考文档 Windows内核原理-同步IO与异步IO 背景 在前段时间检查异常连接导致的内存泄漏排查的过程中,主要涉及到了windows异步I/O相关的知识,看了许多包括重叠I/O.完成端口.IRP.设备驱动程序等Windows下I/O相关的知识,虽然学习到了很多东西,但是仍然需要自顶而下的将所有知识进行梳理. 目的 本片文章主要讲解同步I/O与异步I/O相关知识,希望通过编写本篇

tornado和异步IO的基本简单流程原理

tornado #建立连接 from tornado.httpclient import AsyncHTTPClient #发送请求 from tornado.httpclient import HTTPRequest #事件循环 from tornado import ioloop #接收的次数 REV_COUNTER = 0 #请求的次数 REQ_COUNTER = 0 def handle_response(reponse): """ 处理返回值内容(需要维护计数器,来

异步IO原理及相应函数

何为异步IO? (1)几乎可以认为:异步IO就是操作系统用软件实现的一套中断响应系统.(2)异步IO的工作方法是:我们当前进程注册一个异步IO事件(使用signal注册一个信号 SIGIO的处理函数),然后当前进程可以正常处理自己的事情,当异步事件发生后当前进 程会收到一个SIGIO信号从而执行绑定的处理函数去处理这个异步事件.其实所有的信号 都是软件实现的一种中断机制,所以异步IO其实就是利用了信号这种软件中断机制来工作 的,工作流程如下:1):设置设备文件fd具有接收IO的功能2):设置异步

理论铺垫:阻塞IO、非阻塞IO、IO多路复用/事件驱动IO(单线程高并发原理)、异步IO

完全来自:http://www.cnblogs.com/alex3714/articles/5876749.html 同步IO和异步IO,阻塞IO和非阻塞IO分别是什么,到底有什么区别?不同的人在不同的上下文下给出的答案是不同的.所以先限定一下本文的上下文. 本文讨论的背景是Linux环境下的network IO. 一 概念说明 在进行解释之前,首先要说明几个概念:- 用户空间和内核空间- 进程切换- 进程的阻塞- 文件描述符- 缓存 I/O 用户空间与内核空间 现在操作系统都是采用虚拟存储器,

Node.js异步IO

为什么要异步I/O? 从用户体验角度讲,异步IO可以消除UI阻塞,快速响应资源 JavaScript是单线程的,它与UI渲染共用一个线程.所以在JavaScript执行的时候,UI渲染将处于停顿的状态,用户体验较差.而异步请求可以在下载资源的时候,JavaScript和UI渲染都同时执行,消除UI阻塞,降低响应资源需要的时间开销. 假如一个资源来自两个不同位置的数据的返回,第一个资源需要M毫秒的耗时,第二个资源需要N毫秒的耗时.当采用同步的方式,总耗时为(M+N)毫秒,代码大致如下: //耗时为

arm驱动linux异步通知与异步IO【转】

转自:http://blog.csdn.net/chinazhangzhong123/article/details/51638793 <[ arm驱动] linux异步通知与 异步IO>涉及内核驱动函数二个,内核结构体一个,分析了内核驱动函数二个:可参考的相关应用程序模板或内核驱动模板二个,可参考的相关应用程序模板或内核驱动三个 描述:设备文件IO访问:阻塞与非阻塞io访问,poll函数提供较好的解决设备访问的机制,但是如果有了异步通知整套机制就更加完整了 一.阻塞 I/O,非阻塞IO,异步

【译】深入理解python3.4中Asyncio库与Node.js的异步IO机制

转载自http://xidui.github.io/2015/10/29/%E6%B7%B1%E5%85%A5%E7%90%86%E8%A7%A3python3-4-Asyncio%E5%BA%93%E4%B8%8ENode-js%E7%9A%84%E5%BC%82%E6%AD%A5IO%E6%9C%BA%E5%88%B6/ 译者:xidui原文: http://sahandsaba.com/understanding-asyncio-node-js-python-3-4.html 译者前言 如

C++ 异步 IO(三) 异步IO

The oldest solution that people still use for this problem is select(). The select() call takes three sets of fds (implemented as bit arrays): one for reading, one for writing, and one for "exceptions". It waits until a socket from one of the se