springcloud:webflux源码浅析

spring-webflux采用了事件驱动异步非阻塞模式,借助EventLoop以少量线程应对高并发的访问,这样的改变使spring也变得像vert.x一样有趣~ 🥕

之前就一直很看好spring-webflux,最近这星期就花时间用了用,也看了自己感兴趣的源码(虽然还没看完),就先记录分享一下,省的到时候就忘了。

webflux and vert.x

用过vert.x的小伙伴都知道,vert.x是函数式开发模式,而spring-webflux不但支持函数式,还很贴心的提供了webmvc注解的方式来编写web服务,这让学习和迁移代码上都有很大的便利。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@RestController
public class AdController {

@RequestMapping(value = "/")
public Mono<Object> test() {
return Mono.create(s -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
s.success(ResultBean.SUCCESS);
}).subscribeOn(Schedulers.elastic());
}
}

之前我封装过vert.x,通过注解的方式来声明路由并统一管理,跟webflux非常相似。有兴趣的小伙伴可以去玩耍一下 vertx-shine

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@RouteHandler
@RouteMapping(value = "/video")
public class VideoVerticle {

private Vertx vertx = VerticleLauncher.getStandardVertx();

@RouteMapping(method = RequestMethod.GET, value = "/test")
public Handler<RoutingContext> test() {
return routingContext -> vertx.executeBlocking(future -> {
System.out.println("executeBlocking: "+Thread.currentThread().getName());
System.out.println("type : " + routingContext.request().getParam("type"));
//需要调用complete FutureImpl -> setHandler 需要
future.complete(1);
}, false, h -> routingContext.response().setStatusCode(200).end("It is amazing !"));
}
}

两者大概的思路都是一致的,通过一个eventloop线程来不断循环接收event,并将event(一些耗时的业务逻辑和网络IO等)分配给相应的handler(worker线程)来处理。

event-loop

vert.x中两者称为EventLoop线程Worker线程,而webflux对应的是NIO线程elastic(single、parallel 看业务需要选择)线程

这是一个简化的模型,很便于理解,而真正有趣的地方是如何来实现这个模型。

源码分析

整个实现的源码比较多,先整理出一部分我比较感兴趣的,是关于线程处理完event后,到底是哪个线程返回结果。其实就是上述代码中s.success(ResultBean.SUCCESS);的玄机。

注: 如果spring-boot-starter-webspring-boot-starter-webflux都依赖的话,会默认设置为SpringMVC。
不过在SpringMVC模式下使用异步类型(DeferredResult,Flux或SseEmitter),这将会是异步的,但读写仍然会被阻塞。

使用的依赖版本如下,Servlet容器使用的是undertow:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>

可以在启动的时候指定使用REACTIVESERVLET

1
2
3
SpringApplication app = new SpringApplication(MyApplication.class;
app.setWebApplicationType(WebApplicationType.REACTIVE);
app.run();

因为我的项目中有依赖会引入spring-boot-starter-web(为了兼容swagger),所以接下来的分析是基于SERVLET,等之后整理了再分析基于REACTIVE

s.success()方法进入,会经过很多类MonoCreate->MonoSubscribeOn->ScopePassingSpanSubscriber->StrictSubscriber->ReactiveTypeHandler,最后到DeferredResultsetResultInternal,这里很关键。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DeferredResult<T> {

/**
* 重要
*/
private boolean setResultInternal(Object result) {
// 查验是否过期
if (isSetOrExpired()) {
return false;
}
DeferredResultHandler resultHandlerToUse;
//这里很关键
synchronized (this) {
// 抢占到了锁,再次查验是否过期
if (isSetOrExpired()) {
return false;
}
// 将传进来的结果赋值给result,注意result是volatile修饰的
this.result = result;
resultHandlerToUse = this.resultHandler;
if (resultHandlerToUse == null) {
// 是否设置了handler 没有设置返回true
return true;
}
// 清除handler
this.resultHandler = null;
}
// 如果设置了handler,进行回调
resultHandlerToUse.handleResult(result);
return true;
}
}

我们继续找DeferredResultHandler函数式接口的实现,发现是在WebAsyncManager。向上去找源码会发现源头是我们熟悉的RequestMappingHandlerAdapter,那该方法就是在请求进来是NIO线程来执行的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public final class WebAsyncManager {

public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
......

try {
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
// 这个setResultHandler方法,非常重要
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
setConcurrentResultAndDispatch(result);
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}
}

public class DeferredResult<T> {

/**
* 重要
*/
public final void setResultHandler(DeferredResultHandler resultHandler) {
Assert.notNull(resultHandler, "DeferredResultHandler is required");
// 查验是否过期
if (this.expired) {
return;
}
Object resultToHandle;
synchronized (this) {
// 抢占到了锁,再次查验是否过期
if (this.expired) {
return;
}
resultToHandle = this.result;
if (resultToHandle == RESULT_NONE) {
//如果结果还没有被设置,将传进来的DeferredResultHandler赋值给resultHandler
this.resultHandler = resultHandler;
return;
}
}
// 走到这说明result已经被设置了,那就直接执行回调
try {
resultHandler.handleResult(resultToHandle);
}
catch (Throwable ex) {
logger.debug("Failed to handle existing result", ex);
}
}
}

那我们继续顺着代码往下,假设resultHandlerToUse.handleResult(result);成功回调了,就会到setConcurrentResultAndDispatch方法中,也是一些类的跳转,WebAsyncManager->StandardServletAsyncWebRequest->AsyncContextImpldispatchAsyncRequest方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class AsyncContextImpl implements AsyncContext {

private void dispatchAsyncRequest(final ServletDispatcher servletDispatcher, final ServletPathMatch pathInfo, final HttpServerExchange exchange) {
doDispatch(new Runnable() {
@Override
public void run() {
Connectors.executeRootHandler(new HttpHandler() {
@Override
public void handleRequest(final HttpServerExchange exchange) throws Exception {
servletDispatcher.dispatchToPath(exchange, pathInfo, DispatcherType.ASYNC);
}
}, exchange);
}
});
}

private synchronized void doDispatch(final Runnable runnable) {
if (dispatched) {
throw UndertowServletMessages.MESSAGES.asyncRequestAlreadyDispatched();
}
dispatched = true;
final HttpServletRequestImpl request = servletRequestContext.getOriginalRequest();
addAsyncTask(new Runnable() {
@Override
public void run() {
request.asyncRequestDispatched();
runnable.run();
}
});
if (timeoutKey != null) {
timeoutKey.remove();
}
}

public synchronized void addAsyncTask(final Runnable runnable) {
asyncTaskQueue.add(runnable);
if (!processingAsyncTask) {
processAsyncTask();
}
}

private synchronized void processAsyncTask() {
if (!initialRequestDone) {
return;
}
updateTimeout();
final Runnable task = asyncTaskQueue.poll();
if (task != null) {
processingAsyncTask = true;
asyncExecutor().execute(new TaskDispatchRunnable(task));
} else {
processingAsyncTask = false;
}
}
}

AsyncContextImpl中的方法涉及比较多,dispatchAsyncRequestdoDispatch方法类似都是包装了一个Runnable,等待被启动,其中dispatchToPath方法最后会返回结果到请求方。而addAsyncTaskprocessAsyncTask维护了一个双端队列,这里不去深究,关注到asyncExecutor().execute(new TaskDispatchRunnable(task));,其中asyncExecutor()方法决定了最后是什么线程来启动之前的Runnable。跟下去会发现到了XnioIoThread,那之前疑惑的答案就已经出来了。

取其精华

前面唠叨了一堆,无非就是源码的运行过程,整理了一下线程的流程,如下图:

thread-process

不过这图只是画出了一种理想的状态,因为NIO线程和elastic线程两者的时序是无法保证的,所以并不能保证setResultHandler是在setResultInternal之前执行,就我们上面源码中标识了重要的方法,没有注意到的可以翻回去看看。

这里就很有意思了,spring的设计是不管这2个线程的时序,而是通过synchronized的互斥,来保证同一时刻内只有一个线程能获得DeferredResult这个锁的对象。

若是先执行setResultHandler,会判断被volatile修饰的result,若没有被重新赋值,那就说明还未执行setResultInternal,那就设置handler等待setResultInternal运行来处理。相对的,如果先执行了setResultInternal,会将result赋值,有handler就让elastic执行回调,如果没有则让NIO线程自己执行回调。


Github 不要吝啬你的star ^.^
更多精彩 戳我

Follow me on GitHub