在流式处理的过程中, 在中间步骤的处理中, 如果涉及到一些费事的操作或者是外部系统的数据交互, 那么就会给整个流造成一定的延迟. 在 flink 的 1.2 版本中引入了 Asynchronous I/O, 能够支持异步的操作, 以提高 flink 系统与外部数据系统交互的性能及吞吐量.
在使用 Flink 的异步 IO 时, 主要有两个 API可以使用, 一个是AsyncDataStream.unorderedWait( ), 另一个AsyncDataStream.orderedWait( ).在异步处理过程中,原本数据的顺序可能会发生变化, 使用unorderWait的方法, 不会考虑顺序的问题, 一旦处理完成就会直接返回结果, 这种方法具有较低的延迟和负载. 那么orderWait的方法就是想对应的, 严格按照原本流中的数据顺序做返回, 会对系统造成一定的延迟. 实际中应该根据具体的业务情况做选择.unorderedWait或orderedWait有两个关于async operation的参数,一个是timeout参数用于设置async的超时时间,一个是capacity参数用于指定同一时刻最大允许多少个(并发
)async request在执行;
在使用异步IO时,需要自己去继承AsyncFunction,AsyncFunction接口继承了Function,它定义了asyncInvoke方法以及一个default的timeout方法;asyncInvoke方法执行异步逻辑,然后通过ResultFuture.complete将结果或异常设置到ResultFuture,如果异常则通过ResultFuture.completeExceptionally(Throwable)来传递 ResultFuture;RichAsyncFunction继承了AbstractRichFunction,同时声明实现AsyncFunction接口,它不没有实现asyncInvoke,交由子类实现;它覆盖了setRuntimeContext方法,这里使用RichAsyncFunctionRuntimeContext或者RichAsyncFunctionIterationRuntimeContext进行包装.
下面是一个验证 Async I/O 的demo, 具体代码见仓库 -> code link
1 | public class AsyncIOExample { |
以上代码的输出结果为:
1 | 读取数据:D 当前时间:1569574233046 |