reactive


从CompletableFuture到Reactor编程

通过 CompletableFuture 和 Lambda 表达式,可以快速实现轻量业务异步封装与编排,与 Callback 相比可以避免方法多层嵌套问题,但面对相对复杂业务逻辑时仍存在以下局限:

  • 难以简单优雅实现多异步任务编排;

  • 难以处理实时流式场景;

  • 难以支持高级异常处理;

  • 不支持任务延迟执行。

CompletableFuture f1 = CompletableFuture.supplyAsync(() ->
    "hello"
);
// f2依赖f1的结果做转换
CompletableFuture f2 = f1.thenApplyAsync(t ->
    t + " world"
);
System.out.println("异步结果:" + f2.get());

相比 Future,基于 Reactive 模型丰富的操作符组合(filter/map/flatMap/zip/onErrorResume 等高阶函数)代码清晰易读,搭配 Lamda 可以轻松实现复杂业务场景任务编排。

Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用Proactor模式。

在 Reactor 中,执行模式以及执行过程取决于所使用的 Scheduler
默认情况下, Reactor提供三种核心调度程序接口实现:

1. SingleScheduler:能为一个专用工作单元安排所有可能的任务,具有时间性,因此可以延迟安排定期事件。此调度程序可以使用Scheduler.single()进行引用。

2, ParallelScheduler: 适用于固定大小的工作单元池(默认情况下,收CPU内核数量限制)适合CPU密集任务,Scheduler.parallel()进行引用。
3. ElasticScheduler:可以动态创建工作单元池并缓存线程池,由于其所创建的线程池没有最大数量限制,因此此调度程序非常适合用于IO密集操作的调度,Scheduler.elastic()进行引用。

可以使用 Schedulers.fromExecutorService(ExecutorService) 基于现有的 ExecutorService 创建 Scheduler

 Scheduler.parallel() 创建一个基于单线程 ExecutorService 的固定大小的任务线程池。 因为可能会有一个或两个线程导致问题,它总是至少创建 4 个线程。然后 publishOn 方法便共享了这些任务线程, 当 publishOn 请求元素的时候,会从任一个正在发出元素的线程那里获取元素。这样, 就是进行了任务共享(一种资源共享方式)。

Scheduler.elastic() 也能创建线程,它能够很方便地创建专门的线程(以便跑一些可能会阻塞资源的任务, 比如一个同步服务),请见 如何包装一个同步阻塞的调用?。

Reactive编程核心是背压,即nothing happens until you subscribe,在使用Reactor构建反应式流时,数据会从发布者流向订阅者,同时也会从订阅者向上传播到发布者

https://projectreactor.io/docs/core/release/reference/

http://htmlpreview.github.io/?https://github.com/get-set/reactor-core/blob/master-zh/src/docs/index.html

相关