序
本文主要研究下 reactor 异步线程的变量传递
threadlocal 的问题
在传统的请求 / 应答同步模式中, 使用 threadlocal 来传递上下文变量是非常方便的, 可以省得在每个方法参数添加公用的变量, 比如当前登录用户但是业务方法可能使用了 async 或者在其他线程池中异步执行, 这个时候 threadlocal 的作用就失效了
这个时候的解决办法就是采取 propagation 模式, 即在同步线程与异步线程衔接处传播这个变量
TaskDecorator
比如 spring 就提供了 TaskDecorator, 通过实现这个接口, 可以自己控制传播那些变量例如:
- class MdcTaskDecorator implements TaskDecorator {
- @Override
- public Runnable decorate(Runnable runnable) {
- // Right now: web thread context !
- // (Grab the current thread MDC data)
- Map<String, String> contextMap = MDC.getCopyOfContextMap();
- return () -> {
- try {
- // Right now: @Async thread context !
- // (Restore the Web thread context's MDC data)
- MDC.setContextMap(contextMap);
- runnable.run();
- } finally {
- MDC.clear();
- }
- };
- }
- }
这里注意在 finally 里头 clear
配置这个 taskDecorator
- @EnableAsync
- @Configuration
- public class AsyncConfig implements AsyncConfigurer {
- @Override
- public Executor getAsyncExecutor() {
- ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setTaskDecorator(new MdcTaskDecorator());
- executor.initialize();
- return executor;
- }
- }
完整实例详见 Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads
Reactor Context
spring5 引入 webflux, 其底层是基于 reactor, 那么 reactor 如何进行上下文变量的传播呢? 官方提供了 Context 对象来替代 threadlocal
其特性如下:
类似 map 的 kv 操作, 比如 put(Object key, Object value),putAll(Context), hasKey(Object key)
immutable, 即同一个 key, 后面 put 不会覆盖
提供 getOrDefault,getOrEmpty 方法
Context 与作用链上的每个 Subscriber 绑定
通过 subscriberContext(Context) 来访问
Context 的作用是自底向上
实例
设置及读取
- @Test
- public void testSubscriberContext(){
- String key = "message";
- Mono<String> r = Mono.just("Hello")
- .flatMap( s -> Mono.subscriberContext()
- .map( ctx -> s + " " + ctx.get(key)))
- .subscriberContext(ctx -> ctx.put(key, "World"));
- StepVerifier.create(r)
- .expectNext("Hello World")
- .verifyComplete();
- }
这里从最底部的 subscriberContext 设置 message 值为 World, 然后 flatMap 里头通过 subscriberContext 来访问
自底向上
- @Test
- public void testContextSequence(){
- String key = "message";
- Mono<String> r = Mono.just("Hello")
- //NOTE 这个 subscriberContext 设置的太高了
- .subscriberContext(ctx -> ctx.put(key, "World"))
- .flatMap( s -> Mono.subscriberContext()
- .map( ctx -> s + "" + ctx.getOrDefault(key,"Stranger")));
- StepVerifier.create(r)
- .expectNext("Hello Stranger")
- .verifyComplete();
- }
由于这个例子的 subscriberContext 设置的太高了, 不能作用在 flatMap 里头的 Mono.subscriberContext()
不可变
- @Test
- public void testContextImmutable(){
- String key = "message";
- Mono<String> r = Mono.subscriberContext()
- .map( ctx -> ctx.put(key, "Hello"))
- // 这里返回了一个新的, 因此上面的设置失效了
- .flatMap( ctx -> Mono.subscriberContext())
- .map( ctx -> ctx.getOrDefault(key,"Default"));
- StepVerifier.create(r)
- .expectNext("Default")
- .verifyComplete();
- }
subscriberContext 永远返回一个新的
多个连续的 subscriberContext
- @Test
- public void testReadOrder(){
- String key = "message";
- Mono<String> r = Mono.just("Hello")
- .flatMap( s -> Mono.subscriberContext()
- .map( ctx -> s + " " + ctx.get(key)))
- .subscriberContext(ctx -> ctx.put(key, "Reactor"))
- .subscriberContext(ctx -> ctx.put(key, "World"));
- StepVerifier.create(r)
- .expectNext("Hello Reactor")
- .verifyComplete();
- }
operator 只会读取离它最近的一个 context
flatMap 间的 subscriberContext
- @Test
- public void testContextBetweenFlatMap(){
- String key = "message";
- Mono<String> r = Mono.just("Hello")
- .flatMap( s -> Mono.subscriberContext()
- .map( ctx -> s + " " + ctx.get(key)))
- .subscriberContext(ctx -> ctx.put(key, "Reactor"))
- .flatMap( s -> Mono.subscriberContext()
- .map( ctx -> s + " " + ctx.get(key)))
- .subscriberContext(ctx -> ctx.put(key, "World"));
- StepVerifier.create(r)
- .expectNext("Hello Reactor World")
- .verifyComplete();
- }
flatMap 读取离它最近的 context
flatMap 中的 subscriberContext
- @Test
- public void testContextInFlatMap(){
- String key = "message";
- Mono<String> r =
- Mono.just("Hello")
- .flatMap( s -> Mono.subscriberContext()
- .map( ctx -> s + " " + ctx.get(key))
- )
- .flatMap( s -> Mono.subscriberContext()
- .map( ctx -> s + " " + ctx.get(key))
- .subscriberContext(ctx -> ctx.put(key, "Reactor"))
- )
- .subscriberContext(ctx -> ctx.put(key, "World"));
- StepVerifier.create(r)
- .expectNext("Hello World Reactor")
- .verifyComplete();
- }
这里第一个 flatMap 无法读取第二个 flatMap 内部的 context
小结
reactor 通过提供 Context 来实现了类似同步线程 threadlocal 的功能, 非常强大, 值得好好琢磨
- doc
- TaskDecorator
- Spring 4.3: Using a TaskDecorator to copy MDC data to @Async threads
- HOW TO PASS CONTEXT IN STANDARD WAY - WITHOUT THREADLOCAL
- Spring Security Context Propagation with @Async
如何在 async 线程中访问 RequestContextHolder
- Context Aware Java Executor and Spring's @Async
- 8.8.1. The Context API
来源: https://juejin.im/post/5aabe7e6f265da239706a013