0%

响应式编程-WebFlux

介绍

Spring提供了两个并行的技术栈。

  • Servlet StackSpring MVC是建立在Servlet API之上,使用同步阻塞IO架构,实现一个请求一个线程处理的模型。

  • Reactive StackSpring WebFlux是一个从头构建的非阻塞web框架,利用了多核,下一代处理器的优势,可以处理大量的并发连接。

并发模型

在Spring MVC(一般的servlet程序),使用一个大的线程池应对请求处理期间潜在的阻塞。

在Spring WebFlux(一般的非阻塞程序),使用较小的固定线程池(事件循环组)来处理请求。

适配

Spring 框架的整个基础设施都是围绕Servlet API 构建的,它们之间紧密耦合。

响应式Web内核首先需要使用模拟接口和对请求进行处理的方法替换javax.servlet.Servlet.service 方法,更改相关的类和接口。

增强和定制 Servlet API 对客户端请求和服务器响应的交互方式

  • ServerHttpRequest

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    package org.springframework.http.server.reactive;
    /**
    * 请求的封装。
    * 获取请求报文体的类型是Flux,表示具备响应式能力。
    * DataBuffer是针对字节缓冲区的抽象,便于对特定服务器实现数据交换。
    * 除了请求报文体,还有消息头、请求路径、cookie、查询参数等信息,可以在该接口或子接口中提
    供。
    */
    public interface ServerHttpRequest extends HttpRequest, ReactiveHttpInputMessage {
    }

    package org.springframework.http;
    public interface ReactiveHttpInputMessage extends HttpMessage {
    /**
    * Return the body of the message as a {@link Publisher}.
    * @return the body content publisher
    */
    Flux<DataBuffer> getBody();
    }
  • ServerHttpResponse

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    package org.springframework.http.server.reactive;
    /**
    * 响应的封装。
    * writeWith方法接收的参数是Publisher,提供了响应式,并与特定响应式库解耦。
    * 返回值是Mono<Void>,表示向网络发送数据是一个异步的过程。
    * 即,只有当订阅Mono时才会执行发送数据的过程。
    * 接收服务器可以根据传输协议的流控支持背压。
    */
    public interface ServerHttpResponse extends ReactiveHttpOutputMessage {
    }

    package org.springframework.http;
    public interface ReactiveHttpOutputMessage extends HttpMessage {
    /**
    * Use the given {@link Publisher} to write the body of the message to the
    * underlying HTTP layer.
    * @param body the body content publisher
    * @return a {@link Mono} that indicates completion or error
    */
    Mono<Void> writeWith(Publisher<? extends DataBuffer> body);
    }
  • ServerWebExchange

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    /**
    * HTTP请求-响应的容器。
    * 这是高层接口,除了HTTP交互,还可以保存框架相关信息。
    * 如请求的已恢复的WebSession信息等。
    */
    interface ServerWebExchange {
    // ...
    ServerHttpRequest getRequest();
    ServerHttpResponse getResponse();
    // ...
    Mono<WebSession> getSession();
    // ...
    }

重用 WebMVC 的基础设施,用 Flux 、 Mono 和 Publisher 等响应式类型替换同步通信。

保留与 Spring Web MVC 相同的 HandlerMapping 和 HandlerAdapter 链,使用基于 Reactor 的响应式交互替换实时命令:

  • HandlerMapping

    1
    2
    3
    4
    5
    6
    interface HandlerMapping {
    /**
    * HandlerExecutionChain getHandler(HttpServletRequest request)
    */
    Mono<Object> getHandler(ServerWebExchange exchange);
    }

    响应式 HandlerMapping 中,两个方法整体上类似,不同之处在于响应式返回Mono 类型支持响应式。

  • HandlerAdapter

    1
    2
    3
    4
    5
    6
    7
    interface HandlerAdapter {
    boolean supports(Object handler);
    /**
    * ModelAndView handle(HttpServletRequest request, HttpServletResponse response, Object handler);
    */
    Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler);
    }

    响应式HandlerAdapter 接口中,由于 ServerWebExchange 类同时组合了请求和响应,因此handle 方法的响应式版本更简洁,该方法返回 HandlerResult 的 Mono 而不是 ModelAndView 。

服务器

Spring Boot的WebFlux-Starter模块默认使用Netty作为服务器,可以达到异步、非阻塞的效果。

Tomcat、Jetty可以与Spring MVC和WebFlux一起使用,但MVC依赖于Servlet阻塞IO;WebFlux依赖于Servlet的非阻塞IO,并结合适配器使用Servlet Api。

对于Undertow,WebFlux可以直接使用Undertow API,无需Servlet API。

SpringBoot响应式服务器启动

寻找web服务器类型

SpringApplication.run(WebFluxApplication.class)

​ = > 进入run方法

​ = > return new SpringApplication(primarySources).run(args);

​ => 进入SpringApplication构造

​ => this.webApplicationType = WebApplicationType.deduceFromClasspath()

​ => 进入deduceFromClasspath方法,寻找web服务器类型

​ => 引入webFlux,存在DispatcherHandler,获取到响应式类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static WebApplicationType deduceFromClasspath() {
//存在org.springframework.web.reactive.DispatcherHandler && 不存在org.springframework.web.servlet.DispatcherServlet
//&& 不存在org.glassfish.jersey.servlet.ServletContainer
if (ClassUtils.isPresent(WEBFLUX_INDICATOR_CLASS, null) && !ClassUtils.isPresent(WEBMVC_INDICATOR_CLASS, null)
&& !ClassUtils.isPresent(JERSEY_INDICATOR_CLASS, null)) {
//返回响应式类型
return WebApplicationType.REACTIVE;
}
//"javax.servlet.Servlet","org.springframework.web.context.ConfigurableWebApplicationContext"
for (String className : SERVLET_INDICATOR_CLASSES) {
//Servlet和ConfigurableWebApplicationContext都不存在,返回未知类型
if (!ClassUtils.isPresent(className, null)) {
return WebApplicationType.NONE;
}
}
//返回servlet类型
return WebApplicationType.SERVLET;
}

创建响应式服务上下文

SpringApplication.run(WebFluxApplication.class)

​ = > 进入run方法

​ = > return new SpringApplication(primarySources).run(args);

​ => 进入run方法

​ => context = reateApplicationContext()

​ => this.applicationContextFactory.create(this.webApplicationType),根据web服务类型创建服务

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* {@link ApplicationContextFactory} registered in {@code spring.factories} to support
* {@link AnnotationConfigReactiveWebServerApplicationContext}.
*/
static class Factory implements ApplicationContextFactory {

@Override
public ConfigurableApplicationContext create(WebApplicationType webApplicationType) {
return (webApplicationType != WebApplicationType.REACTIVE) ? null
: new AnnotationConfigReactiveWebServerApplicationContext();//创建注解配置的响应式web服务上下文
}

}

创建web服务器

refresh()

=> onRefresh(),调用ReactiveWebServerApplicationContext实现

​ => createWebServer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext
implements ConfigurableWebServerApplicationContext {

private void createWebServer() {
WebServerManager serverManager = this.serverManager;
if (serverManager == null) {
//获取工厂bean的名称
String webServerFactoryBeanName = getWebServerFactoryBeanName();
//从IOC容器获取工厂对象,默认NettyReactiveWebServerFactory
ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);
//获取容器是否懒加载初始化标识
boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();
//创建服务器管理器,用于管理 web服务器和HttpHandler
this.serverManager = new WebServerManager(this, webServerFactory, /*从IOC获取所有HttpHandler*/this::getHttpHandler, lazyInit);
getBeanFactory().registerSingleton("webServerGracefulShutdown",
new WebServerGracefulShutdownLifecycle(this.serverManager));
//spring容器创建完毕,在refresh的finishRefresh()中会调用WebServerStartStopLifecycle的start(),执行this.weServerManager.start(),启动服务器
getBeanFactory().registerSingleton("webServerStartStop",
new WebServerStartStopLifecycle(this.serverManager));
}
}

class WebServerManager {
WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory,
Supplier<HttpHandler> handlerSupplier, boolean lazyInit) {
this.applicationContext = applicationContext;
Assert.notNull(factory, "Factory must not be null");
this.handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit);
//根据工厂创建web服务器,NettyReactiveWebServerFactory.getWebServer
this.webServer = factory.getWebServer(this.handler);
}
void start() {
this.handler.initializeHandler();
this.webServer.start();
this.applicationContext
.publishEvent(new ReactiveWebServerInitializedEvent(this.webServer, this.applicationContext));
}
}

启动服务器

refresh()

​ => finishRefresh()

​ => getLifecycleProcessor().onRefresh();

​ => startBeans(true),WebServerStartStopLifecycle在此处调用

1
2
3
4
5
6
7
8
class WebServerStartStopLifecycle implements SmartLifecycle {
@Override
public void start() {
//启动web服务器
this.weServerManager.start();
this.running = true;
}
}

流程图

  • 传入请求,由底层服务器引擎处理。服务器引擎列表不限于基于ServletAPI 的服务器。每个服务器引擎都有自己的响应式适配器,将 HTTP 请求和 HTTP 响应的内部表示映射到ServerHttpRequest 和 ServerHttpResponse 。
  • HttpHandler 阶段,该阶段将给定的 ServerHttpRequest 、 ServerHttpResponse 、用户Session 和相关信息组合到 ServerWebExchage 实例中。
  • WebFilterChain 阶段,它将定义的 WebFilter 组合到链中。然后, WebFilterChain 会负责执行此链中每个 WebFilter 实例的 WebFilter#filter 方法,以过滤传入的ServerWebExchange 。
  • 如果满足所有过滤条件, WebFilterChain 将调用 WebHandler 实例。
  • 查找 HandlerMapping 实例并调用第一个合适的实例。可以是RouterFunctionMapping、也可以是RequestMappingHandlerMapping 。
  • handerAdapter阶段,与以前功能相同,使用响应式流来构建响应式流。

DispatcherHandler

与Spring MVC类似,Spring WebFlux也是围绕前端控制器模式设计。DispatcherHandler为请求提供共享性算法实际工作委托给组件处理。

DispatcherHandler是WebHandler的子类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class DispatcherHandler implements WebHandler, PreFlightRequestHandler, ApplicationContextAware {
//处理器映射器
@Nullable
private List<HandlerMapping> handlerMappings;
//处理器适配器
@Nullable
private List<HandlerAdapter> handlerAdapters;
//结果处理器
@Nullable
private List<HandlerResultHandler> resultHandlers;

@Override
public Mono<Void> handle(ServerWebExchange exchange) {
if (this.handlerMappings == null) {
return createNotFoundError();
}
if (CorsUtils.isPreFlightRequest(exchange.getRequest())) {
return handlePreFlight(exchange);
}
return Flux.fromIterable(this.handlerMappings)
.concatMap(mapping -> mapping.getHandler(exchange))//根据处理器映射器寻找handler
.next()//获取找到的第一个hander
.switchIfEmpty(createNotFoundError())//为空则返回"No matching handler"错误
.flatMap(handler -> invokeHandler(exchange, handler))//寻找支持该handler的handlerAdapter调用
.flatMap(result -> handleResult(exchange, result));//处理结果
}
}

集成Springboot

依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.7.3</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

实体

1
2
3
4
5
6
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {
private String id;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Data
public class PasswordDTO {
/**
* 原密码
*/
private String raw;
/**
* 加密密码
*/
private String secured;

public PasswordDTO(@JsonProperty("raw") String raw, @JsonProperty("secured") String secured) {
this.raw = raw;
this.secured = secured;
}

}

Handler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.yrl.handler;

import com.yrl.bean.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.core.publisher.Mono;

import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;

@Component
public class OrderHandler {
private Map<String, Order> orderMap = new HashMap<>();

/**
* 创建订单
*
* @param serverRequest
* @return
*/
public Mono<ServerResponse> create(ServerRequest serverRequest) {
return serverRequest.bodyToMono(Order.class)
.doOnNext(order -> {
orderMap.put(order.getId(), order);
})
.flatMap(order -> ServerResponse.created(URI.create("/order/" + order.getId())).build());

}

/**
* 获取订单信息
*
* @param request
* @return
*/
public Mono<ServerResponse> get(ServerRequest request) {
String id = request.pathVariable("id");
Order order1 = orderMap.get(id);
return Mono.just(order1)
.flatMap(order -> ServerResponse.ok().syncBody(order)).switchIfEmpty(ServerResponse.notFound().build());
}

/**
* 获取订单列表
*
* @param request
* @return
*/
public Mono<ServerResponse> list(ServerRequest request) {
return Mono.just(orderMap.values().stream().collect(Collectors.toList()))
.flatMap(order -> ServerResponse.ok().syncBody(order))
.switchIfEmpty(ServerResponse.notFound().build());

}

}

路由配置

  • RouterFunctions.nest(RequestPredicate predicate, RouterFunction<T> routerFunction):包含两个参数,参数一:测试条件是否通过、参数二:参数一通过,执行路由函数
  • RouterFunctions.route(RequestPredicate predicate, HandlerFunction<T> handlerFunction):参数一:测试条件是否通过、参数二:参数一通过,执行处理器函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Bean
public RouterFunction<ServerResponse> routes(OrderHandler orderHandler) {
return RouterFunctions.nest(
// 判断请求路径是否匹配指定的前缀
path("/orders"),
//如果匹配成功,则路由到这个函数
RouterFunctions.nest(
//判断请求报文头字段accept是否匹配APPLICATION_JSON
RequestPredicates.accept(MediaType.APPLICATION_JSON),
//如果匹配则路由到下面的路由函数 ,将/orders/{id} 路由到handler的get
RouterFunctions.route(
RequestPredicates.GET("/{id}"),
orderHandler::get
)
)
// 如果get请求 /orders ,则路由到orderhandler.list
.andRoute(
RequestPredicates.method(HttpMethod.GET),
orderHandler::list)
// 如果contentType匹配,并路径匹配orders,则路由到这个函数
.andNest(
RequestPredicates.contentType(MediaType.APPLICATION_JSON),
//如果是POST请求/orders,则路由到handler的create方法
RouterFunctions.route(
RequestPredicates.POST("/"),
orderHandler::create
)
)
);
}

请求

创建

列表

指定id查询

单机启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.yrl;

import com.yrl.bean.PasswordDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
import org.springframework.web.reactive.function.server.RequestPredicates;
import org.springframework.web.reactive.function.server.RouterFunction;
import org.springframework.web.reactive.function.server.RouterFunctions;
import org.springframework.web.reactive.function.server.ServerResponse;
import reactor.netty.DisposableServer;
import reactor.netty.http.server.HttpServer;

public class StandaloneApplication {
static Logger LOGGER = LoggerFactory.getLogger(StandaloneApplication.class);

public static void main(String[] args) {
long start = System.currentTimeMillis();
// 调用Routes方法,然后将RouteFunction转化为HttpHandler
HttpHandler handler = RouterFunctions.toHttpHandler(routes(
// BCryt算法进行18次散列, 这里需要耗时几秒
new BCryptPasswordEncoder(18)
));
// 内置HttpHandler适配器
ReactorHttpHandlerAdapter reactorAdapter = new ReactorHttpHandlerAdapter(handler);
// 创建HTTPServer实例,它是ReatorNettyAPI一部分
DisposableServer server = HttpServer.create()
.host("localhost")
.port(8080) // 配置端口
.handle(reactorAdapter) // 指定handler
.bindNow();// 调用bindNow 启动服务
LOGGER.info("started in" + (System.currentTimeMillis() - start) + "ms");
// 为了使应用程序保持活动状态,阻塞Thread,并监听服务器处理事件
server.onDispose().block();
}

private static RouterFunction<?> routes(BCryptPasswordEncoder passwordEncoder) {
return
//匹配请求POST 并且路径是password
RouterFunctions.route(RequestPredicates.POST("password"),
reqeust -> reqeust
.bodyToMono(PasswordDTO.class)
.doOnNext(System.out::println)
// BCryptPasswordEncoder检查已加密的原始密码,加密密码
.map(p -> passwordEncoder.matches(p.getRaw(), p.getSecured()))
// 如果密码匹配成功过 则OK
// 否则EXPECTATION_FAILED
.flatMap(isMatched -> isMatched ? ServerResponse.ok().build() :
ServerResponse.status(HttpStatus.EXPECTATION_FAILED).build()));
}
}

WebClient非阻塞请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
WebClient
//创建客户端,指定
.create("http://localhost:8080")
//指定请求方法为get
.get()
//指定uri,传送变量
.uri("/orders/{id}", 123)
//指定结果处理方式
.retrieve()
//将响应结果反序列化
.bodyToMono(Order.class)
//处理结果
.map(Order::getId)
//订阅输出,只有通过 subscribe 方法, WebClient 才会建立连接并开始发送数据到远程服务器
.subscribe(item -> System.out.println("subscribe:" + item));