0%

服务端推送

一、短轮询

客户端使用定时器不停向服务端发送请求

短轮询实现简单,但频繁请求,消耗带宽,耗费服务器资源

二、长轮询

当没有数据时,服务端不返回请求,挂起请求,等待数据处理完毕才返回。

长轮询使得HTTP请求变成一个长连接,长时间没有响应可能触发超时等错误,需要额外策略处理,实现比短轮询复杂

DeferredResult

spring基于 Servlet 3.0提供了DeferredResult ,可以实现对于请求的异步处理,释放容器连接,支持更高并发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@GetMapping("/deferredResultUser")
public DeferredResult<ResponseEntity<List<User>>> deferredResultListUser() {
//创建延迟返回结果对象
    DeferredResult<ResponseEntity<List<User>>> deferredResult =
        new DeferredResult<>(/*超时时间,单位毫秒*/20000L,/*超时返回的结果数据*/ new ResponseEntity<>(HttpStatus.NOT_MODIFIED));

    deferredResult.onTimeout(() -> {
        log.info("调用超时");
    });

    deferredResult.onCompletion(() -> {
        log.info("调用完成");
    });

//异步处理逻辑
    new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(10);
//当调用setResult时,响应客户端请求
            deferredResult.setResult(new ResponseEntity<>(userService.listUser(), HttpStatus.OK));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }).start();

//挂起请求释放tomcat容器线程,直到调用setResult或者超时
    return deferredResult;
}

三、SSE

基于HTTP无法做到服务器主动向客户端推送数据,SSE使用了一种变通的方式,不返回一次性的数据包,而是一个数据流,数据不断的像客户端发送,客户端不会关闭连接。

SSE方式的推送是单向的,由服务端不断的流向客户端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@RequestMapping(value = "/sse")
@ResponseBody
public SseEmitter sse() {
SseEmitter emitter = new SseEmitter();
new Runnable() {
@Override
public void run() {
try {
emitter.send("123");
Thread.sleep(1500);
emitter.send("456");
emitter.complete();
} catch (Exception e) {
e.printStackTrace();
}
}
};
return emitter;
}

四、Websoket

WebSocket 相对于SSE来说,它可以双向通讯。

其借助Http(s)完成握手,再升级为websoket协议进行通讯操作

STOMP

WebSocket是个规范,在实际的实现中有HTML5规范中的WebSocket API、WebSocket的子协议STOMP。

STOMP(Simple Text Oriented Messaging Protocol)

  • 是一个简单文本定向消息协议
  • 分为生产者(发送消息)、消息代理、消费者(订阅消费)

依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

客户端

1
2
3
4
5
6
7
8
9
10
//订阅消息
//通过stompClient.subscribe订阅目标(destination)发送的消息(广播接收信息)
stompClient.subscribe('/mass/getResponse',function(response){
var message=JSON.parse(response.body);
....
});


//发送消息
stompClient.send("/massRequest",{},JSON.stringify(postValue));

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Configuration
/*开启使用Stomp协议来传输基于消息broker的消息
这时控制器支持使用@MessageMapping,就像使用@RequestMapping一样*/
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
/*注册STOMP协议的节点(endpoint),并映射指定的url,
* 添加一个访问端点“/endpointMark”,客户端打开双通道时需要的url,
* 允许所有的域名跨域访问,指定使用SockJS协议。*/
registry.addEndpoint("/endpointMark")
.setAllowedOrigins("*")
.withSockJS();
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
/*配置一个消息代理(内存中的消息队列)*/
registry.enableSimpleBroker(
"/mass","/queue");
registry.setUserDestinationPrefix("/queue");
}
}



/*消息群发,接受发送至自massRequest的请求*/
@MessageMapping("/massRequest")
@SendTo("/mass/getResponse")
//SendTo 发送至 Broker 下的指定订阅路径mass ,
// Broker再根据getResponse发送消息到订阅了/mass/getResponse的用户处
public ChatRoomResponse mass(ChatRoomRequest chatRoomRequest){
System.out.println("name = " + chatRoomRequest.getName());
System.out.println("chatValue = " + chatRoomRequest.getChatValue());
ChatRoomResponse response=new ChatRoomResponse();
response.setName(chatRoomRequest.getName());
response.setChatValue(chatRoomRequest.getChatValue());
return response;
}