spring-webflux采用了事件驱动异步非阻塞模式,借助EventLoop以少量线程应对高并发的访问,这样的改变使spring也变得像vert.x一样有趣~ 🥕
之前就一直很看好spring-webflux,最近这星期就花时间用了用,也看了自己感兴趣的源码(虽然还没看完),就先记录分享一下,省的到时候就忘了。
webflux and vert.x
用过vert.x的小伙伴都知道,vert.x是函数式开发模式,而spring-webflux不但支持函数式,还很贴心的提供了webmvc注解的方式来编写web服务,这让学习和迁移代码上都有很大的便利。
1 |
|
之前我封装过vert.x,通过注解的方式来声明路由并统一管理,跟webflux非常相似。有兴趣的小伙伴可以去玩耍一下 vertx-shine
1 |
|
两者大概的思路都是一致的,通过一个eventloop线程来不断循环接收event,并将event(一些耗时的业务逻辑和网络IO等)分配给相应的handler(worker线程)来处理。
在vert.x中两者称为EventLoop线程和Worker线程,而webflux对应的是NIO线程和elastic(single、parallel 看业务需要选择)线程。
这是一个简化的模型,很便于理解,而真正有趣的地方是如何来实现这个模型。
源码分析
整个实现的源码比较多,先整理出一部分我比较感兴趣的,是关于线程处理完event后,到底是哪个线程返回结果。其实就是上述代码中s.success(ResultBean.SUCCESS);
的玄机。
注: 如果spring-boot-starter-web和spring-boot-starter-webflux都依赖的话,会默认设置为SpringMVC。
不过在SpringMVC模式下使用异步类型(DeferredResult,Flux或SseEmitter),这将会是异步的,但读写仍然会被阻塞。
使用的依赖版本如下,Servlet容器使用的是undertow:1
2
3
4
5
6
7
8
9
10
11<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-undertow</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
可以在启动的时候指定使用REACTIVE或SERVLET1
2
3SpringApplication app = new SpringApplication(MyApplication.class);
app.setWebApplicationType(WebApplicationType.REACTIVE);
app.run();
因为我的项目中有依赖会引入spring-boot-starter-web(为了兼容swagger),所以接下来的分析是基于SERVLET,等之后整理了再分析基于REACTIVE
从s.success()
方法进入,会经过很多类MonoCreate
->MonoSubscribeOn
->ScopePassingSpanSubscriber
->StrictSubscriber
->ReactiveTypeHandler
,最后到DeferredResult
的setResultInternal,这里很关键。
1 | public class DeferredResult<T> { |
我们继续找DeferredResultHandler
函数式接口的实现,发现是在WebAsyncManager
。向上去找源码会发现源头是我们熟悉的RequestMappingHandlerAdapter
,那该方法就是在请求进来是NIO线程来执行的。
1 | public final class WebAsyncManager { |
那我们继续顺着代码往下,假设resultHandlerToUse.handleResult(result);
成功回调了,就会到setConcurrentResultAndDispatch方法中,也是一些类的跳转,WebAsyncManager
->StandardServletAsyncWebRequest
->AsyncContextImpl
的dispatchAsyncRequest方法。
1 | public class AsyncContextImpl implements AsyncContext { |
AsyncContextImpl
中的方法涉及比较多,dispatchAsyncRequest和doDispatch方法类似都是包装了一个Runnable
,等待被启动,其中dispatchToPath方法最后会返回结果到请求方。而addAsyncTask和processAsyncTask维护了一个双端队列,这里不去深究,关注到asyncExecutor().execute(new TaskDispatchRunnable(task));
,其中asyncExecutor()方法决定了最后是什么线程来启动之前的Runnable
。跟下去会发现到了XnioIoThread
,那之前疑惑的答案就已经出来了。
取其精华
前面唠叨了一堆,无非就是源码的运行过程,整理了一下线程的流程,如下图:
不过这图只是画出了一种理想的状态,因为NIO线程和elastic线程两者的时序是无法保证的,所以并不能保证setResultHandler是在setResultInternal之前执行,就我们上面源码中标识了重要的方法,没有注意到的可以翻回去看看。
这里就很有意思了,spring的设计是不管这2个线程的时序,而是通过synchronized的互斥,来保证同一时刻内只有一个线程能获得DeferredResult
这个锁的对象。
若是先执行setResultHandler,会判断被volatile修饰的result,若没有被重新赋值,那就说明还未执行setResultInternal,那就设置handler等待setResultInternal运行来处理。相对的,如果先执行了setResultInternal,会将result赋值,有handler就让elastic执行回调,如果没有则让NIO线程自己执行回调。