介绍 Spring提供了两个并行的技术栈。
并发模型 在Spring MVC(一般的servlet程序),使用一个大的线程池应对请求处理期间潜在的阻塞。
在Spring WebFlux(一般的非阻塞程序),使用较小的固定线程池(事件循环组)来处理请求。
适配 Spring 框架的整个基础设施都是围绕Servlet API 构建的,它们之间紧密耦合。
响应式Web内核首先需要使用模拟接口和对请求进行处理的方法替换javax.servlet.Servlet.service 方法,更改相关的类和接口。
增强和定制 Servlet API 对客户端请求和服务器响应的交互方式
ServerHttpRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package org.springframework.http.server.reactive;public interface ServerHttpRequest extends HttpRequest , ReactiveHttpInputMessage {} package org.springframework.http;public interface ReactiveHttpInputMessage extends HttpMessage { Flux<DataBuffer> getBody () ; }
ServerHttpResponse
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package org.springframework.http.server.reactive;public interface ServerHttpResponse extends ReactiveHttpOutputMessage {} package org.springframework.http;public interface ReactiveHttpOutputMessage extends HttpMessage { Mono<Void> writeWith (Publisher<? extends DataBuffer> body) ; }
ServerWebExchange
1 2 3 4 5 6 7 8 9 10 11 12 13 interface ServerWebExchange { ServerHttpRequest getRequest () ; ServerHttpResponse getResponse () ; Mono<WebSession> getSession () ; }
重用 WebMVC 的基础设施,用 Flux 、 Mono 和 Publisher 等响应式类型替换同步通信。
保留与 Spring Web MVC 相同的 HandlerMapping 和 HandlerAdapter 链,使用基于 Reactor 的响应式交互替换实时命令:
服务器
Spring Boot的WebFlux-Starter模块默认使用Netty作为服务器,可以达到异步、非阻塞的效果。
Tomcat、Jetty可以与Spring MVC和WebFlux一起使用,但MVC依赖于Servlet阻塞IO;WebFlux依赖于Servlet的非阻塞IO,并结合适配器使用Servlet Api。
对于Undertow,WebFlux可以直接使用Undertow API,无需Servlet API。
SpringBoot响应式服务器启动 寻找web服务器类型
SpringApplication.run(WebFluxApplication.class)
= > 进入run方法
= > return new SpringApplication(primarySources).run(args);
=> 进入SpringApplication构造
=> this.webApplicationType = WebApplicationType.deduceFromClasspath()
=> 进入deduceFromClasspath
方法,寻找web服务器类型
=> 引入webFlux,存在DispatcherHandler,获取到响应式类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static WebApplicationType deduceFromClasspath () { if (ClassUtils.isPresent(WEBFLUX_INDICATOR_CLASS, null ) && !ClassUtils.isPresent(WEBMVC_INDICATOR_CLASS, null ) && !ClassUtils.isPresent(JERSEY_INDICATOR_CLASS, null )) { return WebApplicationType.REACTIVE; } for (String className : SERVLET_INDICATOR_CLASSES) { if (!ClassUtils.isPresent(className, null )) { return WebApplicationType.NONE; } } return WebApplicationType.SERVLET; }
创建响应式服务上下文
SpringApplication.run(WebFluxApplication.class)
= > 进入run方法
= > return new SpringApplication(primarySources).run(args);
=> 进入run方法
=> context = reateApplicationContext()
=> this.applicationContextFactory.create(this.webApplicationType),根据web服务类型创建服务
1 2 3 4 5 6 7 8 9 10 11 12 13 static class Factory implements ApplicationContextFactory { @Override public ConfigurableApplicationContext create (WebApplicationType webApplicationType) { return (webApplicationType != WebApplicationType.REACTIVE) ? null : new AnnotationConfigReactiveWebServerApplicationContext(); } }
创建web服务器
refresh()
=> onRefresh(),调用ReactiveWebServerApplicationContext实现
=> createWebServer()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class ReactiveWebServerApplicationContext extends GenericReactiveWebApplicationContext implements ConfigurableWebServerApplicationContext { private void createWebServer () { WebServerManager serverManager = this .serverManager; if (serverManager == null ) { String webServerFactoryBeanName = getWebServerFactoryBeanName(); ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName); boolean lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit(); this .serverManager = new WebServerManager(this , webServerFactory, this ::getHttpHandler, lazyInit); getBeanFactory().registerSingleton("webServerGracefulShutdown" , new WebServerGracefulShutdownLifecycle(this .serverManager)); getBeanFactory().registerSingleton("webServerStartStop" , new WebServerStartStopLifecycle(this .serverManager)); } } class WebServerManager { WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory, Supplier<HttpHandler> handlerSupplier, boolean lazyInit) { this .applicationContext = applicationContext; Assert.notNull(factory, "Factory must not be null" ); this .handler = new DelayedInitializationHttpHandler(handlerSupplier, lazyInit); this .webServer = factory.getWebServer(this .handler); } void start () { this .handler.initializeHandler(); this .webServer.start(); this .applicationContext .publishEvent(new ReactiveWebServerInitializedEvent(this .webServer, this .applicationContext)); } }
启动服务器
refresh()
=> finishRefresh()
=> getLifecycleProcessor().onRefresh();
=> startBeans(true),WebServerStartStopLifecycle在此处调用
1 2 3 4 5 6 7 8 class WebServerStartStopLifecycle implements SmartLifecycle { @Override public void start () { this .weServerManager.start(); this .running = true ; } }
流程图
传入请求,由底层服务器引擎处理。服务器引擎列表不限于基于ServletAPI 的服务器。每个服务器引擎都有自己的响应式适配器,将 HTTP 请求和 HTTP 响应的内部表示映射到ServerHttpRequest 和 ServerHttpResponse 。
HttpHandler 阶段,该阶段将给定的 ServerHttpRequest 、 ServerHttpResponse 、用户Session 和相关信息组合到 ServerWebExchage 实例中。
WebFilterChain 阶段,它将定义的 WebFilter 组合到链中。然后, WebFilterChain 会负责执行此链中每个 WebFilter 实例的 WebFilter#filter 方法,以过滤传入的ServerWebExchange 。
如果满足所有过滤条件, WebFilterChain 将调用 WebHandler 实例。
查找 HandlerMapping 实例并调用第一个合适的实例。可以是RouterFunctionMapping、也可以是RequestMappingHandlerMapping 。
handerAdapter阶段,与以前功能相同,使用响应式流来构建响应式流。
DispatcherHandler 与Spring MVC类似,Spring WebFlux也是围绕前端控制器模式设计。DispatcherHandler为请求提供共享性算法实际工作委托给组件处理。
DispatcherHandler是WebHandler的子类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public class DispatcherHandler implements WebHandler , PreFlightRequestHandler , ApplicationContextAware { @Nullable private List<HandlerMapping> handlerMappings; @Nullable private List<HandlerAdapter> handlerAdapters; @Nullable private List<HandlerResultHandler> resultHandlers; @Override public Mono<Void> handle (ServerWebExchange exchange) { if (this .handlerMappings == null ) { return createNotFoundError(); } if (CorsUtils.isPreFlightRequest(exchange.getRequest())) { return handlePreFlight(exchange); } return Flux.fromIterable(this .handlerMappings) .concatMap(mapping -> mapping.getHandler(exchange)) .next() .switchIfEmpty(createNotFoundError()) .flatMap(handler -> invokeHandler(exchange, handler)) .flatMap(result -> handleResult(exchange, result)); } }
集成Springboot 依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <parent> <artifactId>spring-boot-starter-parent</artifactId> <groupId>org.springframework.boot</groupId> <version>2.7.3</version> <relativePath/> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> </dependencies>
实体 1 2 3 4 5 6 @Data @AllArgsConstructor @NoArgsConstructor public class Order { private String id; }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Data public class PasswordDTO { private String raw; private String secured; public PasswordDTO (@JsonProperty("raw" ) String raw, @JsonProperty ("secured" ) String secured) { this .raw = raw; this .secured = secured; } }
Handler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package com.yrl.handler;import com.yrl.bean.Order;import org.springframework.stereotype.Component;import org.springframework.web.reactive.function.server.ServerRequest;import org.springframework.web.reactive.function.server.ServerResponse;import reactor.core.publisher.Mono;import java.net.URI;import java.util.HashMap;import java.util.Map;import java.util.stream.Collectors;@Component public class OrderHandler { private Map<String, Order> orderMap = new HashMap<>(); public Mono<ServerResponse> create (ServerRequest serverRequest) { return serverRequest.bodyToMono(Order.class ) .doOnNext (order -> { orderMap.put(order.getId(), order); }) .flatMap(order -> ServerResponse.created(URI.create("/order/" + order.getId())).build()); } public Mono<ServerResponse> get (ServerRequest request) { String id = request.pathVariable("id" ); Order order1 = orderMap.get(id); return Mono.just(order1) .flatMap(order -> ServerResponse.ok().syncBody(order)).switchIfEmpty(ServerResponse.notFound().build()); } public Mono<ServerResponse> list (ServerRequest request) { return Mono.just(orderMap.values().stream().collect(Collectors.toList())) .flatMap(order -> ServerResponse.ok().syncBody(order)) .switchIfEmpty(ServerResponse.notFound().build()); } }
路由配置
RouterFunctions.nest(RequestPredicate predicate, RouterFunction<T> routerFunction)
:包含两个参数,参数一:测试条件是否通过、参数二:参数一通过,执行路由函数
RouterFunctions.route(RequestPredicate predicate, HandlerFunction<T> handlerFunction)
:参数一:测试条件是否通过、参数二:参数一通过,执行处理器函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Bean public RouterFunction<ServerResponse> routes (OrderHandler orderHandler) { return RouterFunctions.nest( path("/orders" ), RouterFunctions.nest( RequestPredicates.accept(MediaType.APPLICATION_JSON), RouterFunctions.route( RequestPredicates.GET("/{id}" ), orderHandler::get ) ) .andRoute( RequestPredicates.method(HttpMethod.GET), orderHandler::list) .andNest( RequestPredicates.contentType(MediaType.APPLICATION_JSON), RouterFunctions.route( RequestPredicates.POST("/" ), orderHandler::create ) ) ); }
请求 创建
列表
指定id查询
单机启动 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package com.yrl;import com.yrl.bean.PasswordDTO;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.http.HttpStatus;import org.springframework.http.server.reactive.HttpHandler;import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;import org.springframework.web.reactive.function.server.RequestPredicates;import org.springframework.web.reactive.function.server.RouterFunction;import org.springframework.web.reactive.function.server.RouterFunctions;import org.springframework.web.reactive.function.server.ServerResponse;import reactor.netty.DisposableServer;import reactor.netty.http.server.HttpServer;public class StandaloneApplication { static Logger LOGGER = LoggerFactory.getLogger(StandaloneApplication.class ) ; public static void main (String[] args) { long start = System.currentTimeMillis(); HttpHandler handler = RouterFunctions.toHttpHandler(routes( new BCryptPasswordEncoder(18 ) )); ReactorHttpHandlerAdapter reactorAdapter = new ReactorHttpHandlerAdapter(handler); DisposableServer server = HttpServer.create() .host("localhost" ) .port(8080 ) .handle(reactorAdapter) .bindNow(); LOGGER.info("started in" + (System.currentTimeMillis() - start) + "ms" ); server.onDispose().block(); } private static RouterFunction<?> routes(BCryptPasswordEncoder passwordEncoder) { return RouterFunctions.route(RequestPredicates.POST("password" ), reqeust -> reqeust .bodyToMono(PasswordDTO.class ) .doOnNext(System.out::println) .map(p -> passwordEncoder.matches(p.getRaw(), p.getSecured())) .flatMap(isMatched -> isMatched ? ServerResponse.ok().build() : ServerResponse.status(HttpStatus.EXPECTATION_FAILED).build())); } }
WebClient非阻塞请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 WebClient .create("http://localhost:8080" ) .get() .uri("/orders/{id}" , 123 ) .retrieve() .bodyToMono(Order.class ) //处理结果 .map(Order::getId) .subscribe(item -> System.out.println("subscribe:" + item));