响应式编程在前端开发以及 Android 开发中有颇多运用, 然而它的非阻塞异步编程模型以及对消息流的处理模式也在后端得到越来越多的应用除了 Netflix 的 OSS 中大量使用了响应式编程之外, 最近阿里也提出 Dubbo 3.0 版本将全面拥抱响应式编程
我之前针对某些项目需求也给出了响应式编程的方案, 较好地解决了并行编程与异步编程的问题不过在深入了解响应式编程之后, 我也给出了自己的一些实践总结
响应式编程并非银弹
响应式编程并非银弹事实上在软件领域, Brooks 提出的没有银弹一说或许将永远生效当我们在选择使用响应式编程时, 一定要明确它的适用场景, 主要包括:
处理由用户或其他系统发起的事件, 如鼠标点击键盘按键或者物联网设备等无时无刻都在发射信号的情况
处理磁盘或网络等高延迟的 IO 数据, 且保证这些 IO 操作是异步的
业务的处理流程是流式的, 且需要高响应的非阻塞操作
除此之外, 我们当然也可以利用一些响应式编程框架如 Rx, 简化并发编程与数据流操作的实现诸如 RxJava 就提供非常完整的工厂方法, 可以将非响应式编程的 IterableArray 以及与响应式编程有一定相关性的 FutureCallable 转换为 Observable 或 Flowable
理解 Source 的本质
Akka Stream 将流数据源定义为 Source,RxJava 则定义为 Observable 或 Flowable 这些响应式编程框架都为 Source 提供了丰富的 operator 其中除了组合流的操作之外, 最基本的操作即为: filtermapflatMapreduce 等
粗略看来, 这些操作皆为函数式的编程接口, 从 FP 的角度看, 我们甚至可以将 Source 视为一个 monad 而站在 Java 编程的角度看, 我们则很容易将 Source 视为等同于集合的数据结构更何况, 响应式编程实则脱胎于 Observer 模式与 Iterator 模式, 其中 Iterator 模式就是针对集合的操作, 只不过 Observable 或 Flowable 是 push 模型, 而 Iterator 模式则为 pull 模型罢了
然而这就是本质的区别, 即 Source 是一个不断发射事件 (dataerrorcomplete) 的源头, 具有时间序列的特点, 而 Iterable 则是一个静态的数据结构, 在对它进行操作时, 该数据结构中存储的数据就已经存在了
合理设计 Source 的粒度
在演示 Observable 或 Flowable 的 API 时, 我们往往喜欢采用 Fluent Interface 的方式连续地调用它的 operator, 形成一个整体的流处理过程这并非总是合理的当一个 Source 被多个 operator 串联起来的时候, 会使得这个 Source 更加难以被重用
例如, 在加载网页时, 默认发起对后端服务的调用并返回需要的用户信息, 若建模为流 A, 其转换如下所示:
uri---->user---->|-->
同时, 有一个鼠标点击事件也会通过随机生成 URL 发起对后端服务的调用并返回需要的用户信息, 倘若建模为流 B, 其转换如下所示:
click---->uri---->user---->|-->
显然, 这两个流在从 uri 到 user 的流处理上出现了重复如果我们创建的流 A 与流 B 并不包含 uri 到 user 的转换, 就可以通过 merge 等合并操作将 A 与 B 合并, 然后再共同重用从 uri 到 user 的转换我们也无需担心创建细粒度流的成本, 因为这些流的创建是 lazy 的, 流虽然创建了, 对流的操作却不会立即执行
分离操作的逻辑
无论是哪个响应式框架, 都为流 (Source) 提供了丰富的 operator 这些 operator 多数都支持 lambda 表达式在处理简单的业务逻辑时, 这样的实现是没有问题的; 然而一旦逻辑变得非常复杂, lambda 表达式的表达能力就不够了从编程实践看, lambda 表达式本身就应该保持微小的粒度这时, 就应该将这些逻辑单独分离出来, 放到单独的类与方法中
例如, 我们根据 device 的配置信息去调用远程服务获取设备信息, 然后提取信息获得业务需要的指标, 对指标进行转换, 最后将转换的数据写入到数据库中结合函数的转换本质, 我们可以将这些操作拆分为多个连续的操作:
deviceConfig-->deviceInfo-->List < extractedInfo > -->transformedInfo-->write
倘若这些转换的逻辑非常复杂, 就可以将这些逻辑分别封装到 DeviceFetcherDeviceExtractorDeviceTransformer 与 DeviceWriter 这四个类中, 于是代码可以写为:
- Flowable.fromIterable(deviceConfigs)
- .parallel()
- .runOn(Schedulers.computation())
- .map(DeviceFetcher::fetch)
- .flatMap(DeviceExtractor::extract)
- .map(DeviceTransformer::transform)
- .sequential()
- .blockingSubscribe(
- info -> DeviceWriter.write(info),
- err -> log(err),
- () -> log("done.")
- );
这一实践提倡将流的操作与每个操作的业务分离开, 既能够保证流操作的简单与纯粹, 又能保证操作业务的重用与可扩展
API 的设计
如果我们要设计符合响应式编程设计的 API, 则应该尽可能保证每个方法都是非阻塞的要做到这一点, 就应该保证每个方法返回的类型是 Source 或 Publisher 例如针对要返回多个数据的流, 可以返回 Observable<T > 或者 Flowable<T>; 如果确定只返回一个数据, 则可以返回 Single<T>; 倘若不确定, 则返回 Maybe<T > 倘若该 API 方法仅仅是一个命令, 无需返回结果, 又需要保证方法是非阻塞的, 则可以考虑返回 Completable<T>
从某种意义上说, 返回 Future<T>
CompletableFuture < T >
或者
CompletableStage < T >
也可以认为是响应式的这三个类型由于是 JDK 自身提供的, 因此更纯粹唯一不便的是这些接口没有提供类似 Observable 那样丰富的 operator, 但好在 Observable 与 Flowable 都提供了 fromFuture()方法对其进行转换, 因而这样的设计也是可取的
Akka Stream 的流拓扑图
Akka Stream 对流处理的抽象被建模为图这一设计思想使得流的处理变得更加直观, 流的处理变成了搭积木游戏可惜 Java 的 DSL 能力实在太弱, 如果对比 Scala 与 Java, 你会发现 GraphDSL 对 Graph 的构造在表现上简直是天壤之别
例如这是官方文档中 Java 版本对 Graph 的构造:
- RunnableGraph.fromGraph(GraphDSL.create(builder -> {
- final Outlet<Integer> A = builder.add(Source.single(0)).out();
- final UniformFanOutShape<Integer, Integer> B = builder.add(Broadcast.create(2));
- final UniformFanInShape<Integer, Integer> C = builder.add(Merge.create(2));
- final FlowShape<Integer, Integer> D = builder.add(Flow.of(Integer.class).map(i -> i + 1));
- final UniformFanOutShape<Integer, Integer> E = builder.add(Balance.create(2));
- final UniformFanInShape<Integer, Integer> F = builder.add(Merge.create(2));
- final Inlet<Integer> G = builder.add(Sink.<Integer>foreach(System.out::println)).in();
- builder.from(F).toFanIn(C); //feedback loop
- builder.from(A).viaFanOut(B).viaFanIn(C).toFanIn(F);
- builder.from(B).via(D).viaFanOut(E).toFanIn(F);
- builder.from(E).toInlet(G);
- return ClosedShape.getInstance();
- })).run(mat);
如下是官方文档中 Scala 版本对同一个 Graph 的构造:
- RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
- val A: Outlet[Int] = builder.add(Source.single(0)).out
- val B: UniformFanOutShape[Int, Int] = builder.add(Broadcast[Int](2))
- val C: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
- val D: FlowShape[Int, Int] = builder.add(Flow[Int].map(_ + 1))
- val E: UniformFanOutShape[Int, Int] = builder.add(Balance[Int](2))
- val F: UniformFanInShape[Int, Int] = builder.add(Merge[Int](2))
- val G: Inlet[Any]
- C <~ F
- A ~> B ~> C ~> F
- B ~> D ~> E ~> F
- E ~> G
- ClosedShape
- })
我们也看到, 倘若在 GraphDSL 中我们能够将构成 Graph 的材料对象事先创建好, 而将 build 工作统一放在一起, 可以在一定程度改进代码的表现力
我们可以将 Akka Stream 的 Graph(完整的 Graph, 称为 ClosedShape, 是可以运行的, 又称之为 RunnableShape)看做是流处理的模具, 至于那些由 Inlet 与 Outlet 端口组成的基础 Shape, 则是设计这些模具的基础材料
模具是静态的, 基础材料与组合材料是可重用的单元, 然后再组合可以重用的业务单元(以函数类或者接口形式进行封装), 这个模具就具有了业务处理能力如果这个拓扑图过于复杂, 我们还可以利用基础 Shape 组合形成一个个更粗粒度 Partial Shap 这些 Partial Shape 不是封闭的, 可以理解为更粗粒度的 SourceSink 和 Flow, 它使得模具的组装变得更加简单 **
材料业务单元模具之间的关系可以形象地用下图来表示:
一旦流处理的模具打造完毕, 打开数据流的水龙头, 让数据源源不断地流入 Graph 中, 流处理就可以自动运行只要 Source 没有发出 complete 或 error 信号, 它就将一直运行下去 Akka Stream 之所以将 Graph 的运行器称之为 materializer, 大约也是源于这样的隐喻吧
使用 Akka Stream 进行响应式流处理, 我建议参考这样的思维
来源: https://juejin.im/entry/5a7fafdb6fb9a0635c0472ec