介绍 Spring提供了两个并行的技术栈。
并发模型 在Spring MVC(一般的servlet程序),使用一个大的线程池应对请求处理期间潜在的阻塞。
在Spring WebFlux(一般的非阻塞程序),使用较小的固定线程池(事件循环组)来处理请求。
适配 Spring 框架的整个基础设施都是围绕Servlet API 构建的,它们之间紧密耦合。
响应式Web内核首先需要使用模拟接口和对请求进行处理的方法替换javax.servlet.Servlet.service 方法,更改相关的类和接口。
增强和定制 Servlet API 对客户端请求和服务器响应的交互方式
ServerHttpRequest
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 package  org.springframework.http.server.reactive;public  interface  ServerHttpRequest  extends  HttpRequest , ReactiveHttpInputMessage   } package  org.springframework.http;public  interface  ReactiveHttpInputMessage  extends  HttpMessage           Flux<DataBuffer> getBody ()  ; } 
ServerHttpResponse
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package  org.springframework.http.server.reactive;public  interface  ServerHttpResponse  extends  ReactiveHttpOutputMessage  } package  org.springframework.http;public  interface  ReactiveHttpOutputMessage  extends  HttpMessage     	 	Mono<Void> writeWith (Publisher<? extends DataBuffer> body)  ; } 
ServerWebExchange
1 2 3 4 5 6 7 8 9 10 11 12 13 interface  ServerWebExchange           ServerHttpRequest getRequest ()  ;     ServerHttpResponse getResponse ()  ;          Mono<WebSession> getSession ()  ; } 
 
重用 WebMVC 的基础设施,用 Flux 、 Mono 和 Publisher 等响应式类型替换同步通信。
保留与 Spring Web MVC 相同的 HandlerMapping 和 HandlerAdapter 链,使用基于 Reactor 的响应式交互替换实时命令:
服务器 Spring Boot的WebFlux-Starter模块默认使用Netty作为服务器,可以达到异步、非阻塞的效果。
Tomcat、Jetty可以与Spring MVC和WebFlux一起使用,但MVC依赖于Servlet阻塞IO;WebFlux依赖于Servlet的非阻塞IO,并结合适配器使用Servlet Api。
对于Undertow,WebFlux可以直接使用Undertow API,无需Servlet API。
SpringBoot响应式服务器启动 寻找web服务器类型 
SpringApplication.run(WebFluxApplication.class)
    = > 进入run方法 
        = >  return new SpringApplication(primarySources).run(args); 
                => 进入SpringApplication构造
                    => this.webApplicationType = WebApplicationType.deduceFromClasspath()
                        => 进入deduceFromClasspath方法,寻找web服务器类型
                            => 引入webFlux,存在DispatcherHandler,获取到响应式类型
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static  WebApplicationType deduceFromClasspath ()                if  (ClassUtils.isPresent(WEBFLUX_INDICATOR_CLASS, null ) && !ClassUtils.isPresent(WEBMVC_INDICATOR_CLASS, null )         && !ClassUtils.isPresent(JERSEY_INDICATOR_CLASS, null )) {                  return  WebApplicationType.REACTIVE;     }          for  (String className : SERVLET_INDICATOR_CLASSES) {                  if  (!ClassUtils.isPresent(className, null )) {             return  WebApplicationType.NONE;         }     }          return  WebApplicationType.SERVLET; } 
创建响应式服务上下文 
SpringApplication.run(WebFluxApplication.class)
    = > 进入run方法 
        = >  return new SpringApplication(primarySources).run(args); 
            => 进入run方法
                => context = reateApplicationContext()
                    => this.applicationContextFactory.create(this.webApplicationType),根据web服务类型创建服务
 
1 2 3 4 5 6 7 8 9 10 11 12 13 static  class  Factory  implements  ApplicationContextFactory     @Override     public  ConfigurableApplicationContext create (WebApplicationType webApplicationType)         return  (webApplicationType != WebApplicationType.REACTIVE) ? null              : new  AnnotationConfigReactiveWebServerApplicationContext();    } } 
创建web服务器 
refresh()
=> onRefresh(),调用ReactiveWebServerApplicationContext实现
    => createWebServer()
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public  class  ReactiveWebServerApplicationContext  extends  GenericReactiveWebApplicationContext 		implements  ConfigurableWebServerApplicationContext   {     	private  void  createWebServer ()          WebServerManager serverManager = this .serverManager;        if  (serverManager == null ) {                      String webServerFactoryBeanName = getWebServerFactoryBeanName();                      ReactiveWebServerFactory webServerFactory = getWebServerFactory(webServerFactoryBeanName);                      boolean  lazyInit = getBeanFactory().getBeanDefinition(webServerFactoryBeanName).isLazyInit();                      this .serverManager = new  WebServerManager(this , webServerFactory, this ::getHttpHandler, lazyInit);           getBeanFactory().registerSingleton("webServerGracefulShutdown" ,                 new  WebServerGracefulShutdownLifecycle(this .serverManager));                      getBeanFactory().registerSingleton("webServerStartStop" ,                 new  WebServerStartStopLifecycle(this .serverManager));        } } class  WebServerManager      WebServerManager(ReactiveWebServerApplicationContext applicationContext, ReactiveWebServerFactory factory, 			Supplier<HttpHandler> handlerSupplier, boolean  lazyInit) { 		this .applicationContext = applicationContext; 		Assert.notNull(factory, "Factory must not be null" ); 		this .handler = new  DelayedInitializationHttpHandler(handlerSupplier, lazyInit);          		this .webServer = factory.getWebServer(this .handler); 	}     void  start ()   		this .handler.initializeHandler(); 		this .webServer.start(); 		this .applicationContext 				.publishEvent(new  ReactiveWebServerInitializedEvent(this .webServer, this .applicationContext)); 	} } 
启动服务器 
refresh()
    => finishRefresh()
        => getLifecycleProcessor().onRefresh();
            => startBeans(true),WebServerStartStopLifecycle在此处调用
 
1 2 3 4 5 6 7 8 class  WebServerStartStopLifecycle  implements  SmartLifecycle  		@Override  	public  void  start ()            		this .weServerManager.start(); 		this .running = true ; 	} } 
流程图 
传入请求,由底层服务器引擎处理。服务器引擎列表不限于基于ServletAPI 的服务器。每个服务器引擎都有自己的响应式适配器,将 HTTP 请求和 HTTP 响应的内部表示映射到ServerHttpRequest 和 ServerHttpResponse 。 
HttpHandler 阶段,该阶段将给定的 ServerHttpRequest 、 ServerHttpResponse 、用户Session 和相关信息组合到 ServerWebExchage 实例中。 
WebFilterChain 阶段,它将定义的 WebFilter 组合到链中。然后, WebFilterChain 会负责执行此链中每个 WebFilter 实例的 WebFilter#filter 方法,以过滤传入的ServerWebExchange 。 
如果满足所有过滤条件, WebFilterChain 将调用 WebHandler 实例。 
查找 HandlerMapping 实例并调用第一个合适的实例。可以是RouterFunctionMapping、也可以是RequestMappingHandlerMapping 。 
handerAdapter阶段,与以前功能相同,使用响应式流来构建响应式流。 
 
DispatcherHandler 与Spring MVC类似,Spring WebFlux也是围绕前端控制器模式设计。DispatcherHandler为请求提供共享性算法实际工作委托给组件处理。
DispatcherHandler是WebHandler的子类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 public  class  DispatcherHandler  implements  WebHandler , PreFlightRequestHandler , ApplicationContextAware           @Nullable  	private  List<HandlerMapping> handlerMappings; 	 	@Nullable  	private  List<HandlerAdapter> handlerAdapters; 	 	@Nullable  	private  List<HandlerResultHandler> resultHandlers;          @Override  	public  Mono<Void> handle (ServerWebExchange exchange)   		if  (this .handlerMappings == null ) { 			return  createNotFoundError(); 		} 		if  (CorsUtils.isPreFlightRequest(exchange.getRequest())) { 			return  handlePreFlight(exchange); 		} 		return  Flux.fromIterable(this .handlerMappings) 				.concatMap(mapping -> mapping.getHandler(exchange)) 				.next() 				.switchIfEmpty(createNotFoundError()) 				.flatMap(handler -> invokeHandler(exchange, handler)) 				.flatMap(result -> handleResult(exchange, result)); 	} } 
集成Springboot 依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <parent>     <artifactId>spring-boot-starter-parent</artifactId>     <groupId>org.springframework.boot</groupId>     <version>2.7.3</version>     <relativePath/> </parent> <dependencies>     <dependency>         <groupId>org.springframework.boot</groupId>         <artifactId>spring-boot-starter-webflux</artifactId>     </dependency>     <dependency>         <groupId>org.projectlombok</groupId>         <artifactId>lombok</artifactId>     </dependency> </dependencies> 
实体 1 2 3 4 5 6 @Data @AllArgsConstructor @NoArgsConstructor public  class  Order      private  String id; } 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Data public  class  PasswordDTO           private  String raw;          private  String secured;     public  PasswordDTO (@JsonProperty("raw" )  String raw, @JsonProperty ("secured" )  String secured)          this .raw = raw;         this .secured = secured;     } } 
Handler 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 package  com.yrl.handler;import  com.yrl.bean.Order;import  org.springframework.stereotype.Component;import  org.springframework.web.reactive.function.server.ServerRequest;import  org.springframework.web.reactive.function.server.ServerResponse;import  reactor.core.publisher.Mono;import  java.net.URI;import  java.util.HashMap;import  java.util.Map;import  java.util.stream.Collectors;@Component public  class  OrderHandler      private  Map<String, Order> orderMap = new  HashMap<>();          public  Mono<ServerResponse> create (ServerRequest serverRequest)           return  serverRequest.bodyToMono(Order.class )                  .doOnNext (order  ->  {                    orderMap.put(order.getId(), order);                 })                 .flatMap(order -> ServerResponse.created(URI.create("/order/"  + order.getId())).build());     }          public  Mono<ServerResponse> get (ServerRequest request)           String id = request.pathVariable("id" );         Order order1 = orderMap.get(id);         return  Mono.just(order1)                 .flatMap(order -> ServerResponse.ok().syncBody(order)).switchIfEmpty(ServerResponse.notFound().build());     }          public  Mono<ServerResponse> list (ServerRequest request)           return  Mono.just(orderMap.values().stream().collect(Collectors.toList()))                 .flatMap(order -> ServerResponse.ok().syncBody(order))                 .switchIfEmpty(ServerResponse.notFound().build());     } } 
路由配置 
RouterFunctions.nest(RequestPredicate predicate, RouterFunction<T> routerFunction):包含两个参数,参数一:测试条件是否通过、参数二:参数一通过,执行路由函数RouterFunctions.route(RequestPredicate predicate, HandlerFunction<T> handlerFunction):参数一:测试条件是否通过、参数二:参数一通过,执行处理器函数 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Bean public  RouterFunction<ServerResponse> routes (OrderHandler orderHandler)      return  RouterFunctions.nest(                          path("/orders" ),                          RouterFunctions.nest(                                                          RequestPredicates.accept(MediaType.APPLICATION_JSON),                                                          RouterFunctions.route(                                     RequestPredicates.GET("/{id}" ),                                     orderHandler::get                             )                     )                                          .andRoute(                             RequestPredicates.method(HttpMethod.GET),                             orderHandler::list)                                          .andNest(                             RequestPredicates.contentType(MediaType.APPLICATION_JSON),                                                          RouterFunctions.route(                                     RequestPredicates.POST("/" ),                                     orderHandler::create                             )                     )     ); } 
请求 创建 列表 指定id查询 单机启动 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 package  com.yrl;import  com.yrl.bean.PasswordDTO;import  org.slf4j.Logger;import  org.slf4j.LoggerFactory;import  org.springframework.http.HttpStatus;import  org.springframework.http.server.reactive.HttpHandler;import  org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;import  org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;import  org.springframework.web.reactive.function.server.RequestPredicates;import  org.springframework.web.reactive.function.server.RouterFunction;import  org.springframework.web.reactive.function.server.RouterFunctions;import  org.springframework.web.reactive.function.server.ServerResponse;import  reactor.netty.DisposableServer;import  reactor.netty.http.server.HttpServer;public  class  StandaloneApplication      static  Logger LOGGER = LoggerFactory.getLogger(StandaloneApplication.class ) ;     public  static  void  main (String[] args)           long  start = System.currentTimeMillis();                  HttpHandler handler = RouterFunctions.toHttpHandler(routes(                                  new  BCryptPasswordEncoder(18 )         ));                  ReactorHttpHandlerAdapter reactorAdapter = new  ReactorHttpHandlerAdapter(handler);                  DisposableServer server = HttpServer.create()                 .host("localhost" )                 .port(8080 )                  .handle(reactorAdapter)                   .bindNow();         LOGGER.info("started in"  + (System.currentTimeMillis() - start) + "ms" );                  server.onDispose().block();     }     private  static  RouterFunction<?> routes(BCryptPasswordEncoder passwordEncoder) {         return                                   RouterFunctions.route(RequestPredicates.POST("password" ),                         reqeust -> reqeust                                 .bodyToMono(PasswordDTO.class )                                  .doOnNext(System.out::println)                                                                  .map(p -> passwordEncoder.matches(p.getRaw(), p.getSecured()))                                                                                                   .flatMap(isMatched -> isMatched ? ServerResponse.ok().build() :                                         ServerResponse.status(HttpStatus.EXPECTATION_FAILED).build()));     } } 
WebClient非阻塞请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 WebClient     	         .create("http://localhost:8080" )     	         .get() 		     	         .uri("/orders/{id}" , 123 ) 	             .retrieve()			     	         .bodyToMono(Order.class )	      	//处理结果         .map(Order::getId)	     	         .subscribe(item -> System.out.println("subscribe:"  + item));