SpringBoot使用Websocket总结

SpringBoot使用Websocket总结

1.添加依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2.写处理器

写一个类实现org.springframework.web.socket.WebSocketHandler

更推荐继承org.springframework.web.socket.handler.AbstractWebSocketHandler类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class TestHandler extends AbstractWebSocketHandler {

//处理textmessage
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
super.handleTextMessage(session, message);
}

//处理binarymessage
@Override
protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
super.handleBinaryMessage(session, message);
}

//处理pong
@Override
protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
super.handlePongMessage(session, message);
}

//报错时
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
super.handleTransportError(session, exception);
}

//连接关闭时
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
super.afterConnectionClosed(session, status);
}

//连接成功时
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
super.afterConnectionEstablished(session);
}

//是否支持报文拆包
@Override
public boolean supportsPartialMessages() {
return super.supportsPartialMessages();
}
}

3.配置路径和拦截器

​ 在一个Configuration类上,添加EnableWebSocket注解启动Websocket

​ 注入WebSocketConfigurerBean即可启动websocket服务端。

addInterceptors方法是添加拦截器,这个拦截器拦截在http协议转向websocket的那个请求上,如果底层使用的是servlet可以强转成ServletServerHttpRequest,在这里能拿到websocket连接时携带的paramhead

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@EnableWebSocket
@Configuration
public class WebsocketConfig {

private static Logger log = LoggerFactory.getLogger(WebsocketConfig.class);

@Bean
public WebSocketConfigurer wsc() {
return registry -> registry.addHandler(new TestHandler(), "/websocket/test")
.addInterceptors(new HandshakeInterceptor() {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
ServletServerHttpRequest req = (ServletServerHttpRequest) request;
return true;
}

@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {

}
}).setAllowedOrigins("*");
}


}

4.技巧

  1. 拦截器返回false,则不会进行websocket协议的转换,如必须要登录才能连接的情况,可以在拦截器里拿到请求session_id来判断,也可以获取参数判断。

  2. 拦截器里的第三个参数,是一个map,放入此map的值可以从WebSocketHandler里面的WebSocketSession身上拿出来。

  3. supportsPartialMessages回调方法是决定是否接受半包,因为websocket协议比较底层,好像Tcp协议一样,如果发送大消息可能会拆成多个小报文。如果不希望处理不完整的报文,希望底层帮忙聚合成完整消息将此方法返回false,这样底层会等待完整报文到达聚合后才回调。

  4. WebSocketSession身上能设置最大报文大小,如果报文过大则会报错的,可以将限制调大。可以在afterConnectionEstablished方法里对WebSocketSession进行设置。tomcatsession默认大小就是 8*1024

    1
    2
    3
    4
    5
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    session.setBinaryMessageSizeLimit(8 * 1024);
    session.setTextMessageSizeLimit(8 * 1024);
    }
    1. WebSocketSession是不支持并发发送消息的,要么自己做同步,要么包装WebSocketSessionConcurrentWebSocketSessionDecorator,关于ConcurrentWebSocketSessionDecorator的使用可以参考Spring并发发送Websocket消息

    2. setAllowedOrigins("*")为控制跨域的,测试阶段可以设成*,不然不允许跨域链接Websocket

    3. 百度搜在线websocket,有在线连接Websocket的网站,这样后端自己做测试比较方便。

5.原理

EnableWebSocket引入了DelegatingWebSocketConfiguration

1
2
3
4
5
6
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebSocketConfiguration.class)
public @interface EnableWebSocket {
}

先看DelegatingWebSocketConfiguration这个类。首先类被构造后,会自动注入WebSocketConfigurer,这个就是我们配置的Bean

1
2
3
4
5
6
@Autowired(required = false)
public void setConfigurers(List<WebSocketConfigurer> configurers) {
if (!CollectionUtils.isEmpty(configurers)) {
this.configurers.addAll(configurers);
}
}

然后父类会注入一个HandlerMapping,会创建一个ServletWebSocketHandlerRegistry,调用所有的

configurer.registerWebSocketHandlers(registry);方法进行配置。

最后从registry.getHandlerMapping()获取HandlerMapping,这个HandlerMapping最终会注册到Springmvc里,接受到请求时会使用该HandlerMapping来处理请求。

1
2
3
4
5
6
7
8
9
10
11
@Bean
public HandlerMapping webSocketHandlerMapping() {
ServletWebSocketHandlerRegistry registry = initHandlerRegistry();
if (registry.requiresTaskScheduler()) {
TaskScheduler scheduler = defaultSockJsTaskScheduler();
Assert.notNull(scheduler, "Expected default TaskScheduler bean");
registry.setTaskScheduler(scheduler);
}
//使用我们的配置,构造出HandlerMapping
return registry.getHandlerMapping();
}
1
2
3
4
5
6
7
private ServletWebSocketHandlerRegistry initHandlerRegistry() {
if (this.handlerRegistry == null) {
this.handlerRegistry = new ServletWebSocketHandlerRegistry();
registerWebSocketHandlers(this.handlerRegistry);
}
return this.handlerRegistry;
}
1
2
3
4
5
6
@Override
protected void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
for (WebSocketConfigurer configurer : this.configurers) {
configurer.registerWebSocketHandlers(registry);
}
}

下面是获取HandlerMapping的过程

​ 这里会创建一个WebSocketHandlerMapping,并设置urlMapurlMap里面存放的是url 和对应的HttpRequestHandle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public AbstractHandlerMapping getHandlerMapping() {
Map<String, Object> urlMap = new LinkedHashMap<>();
for (ServletWebSocketHandlerRegistration registration : this.registrations) {
MultiValueMap<HttpRequestHandler, String> mappings = registration.getMappings();
mappings.forEach((httpHandler, patterns) -> {
for (String pattern : patterns) {
urlMap.put(pattern, httpHandler);
}
});
}

WebSocketHandlerMapping hm = new WebSocketHandlerMapping();
hm.setUrlMap(urlMap);
hm.setOrder(this.order);
if (this.urlPathHelper != null) {
hm.setUrlPathHelper(this.urlPathHelper);
}
return hm;
}

这里是将我们的配置构建成一个WebSocketHttpRequestHandler来处理请求,其中包含了路径,拦截器,WebSocketHandler,HandshakeInterceptor,HandshakeHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
protected void addWebSocketHandlerMapping(MultiValueMap<HttpRequestHandler, String> mappings,
WebSocketHandler webSocketHandler, HandshakeHandler handshakeHandler,
HandshakeInterceptor[] interceptors, String path) {

WebSocketHttpRequestHandler httpHandler =
new WebSocketHttpRequestHandler(webSocketHandler, handshakeHandler);

if (!ObjectUtils.isEmpty(interceptors)) {
httpHandler.setHandshakeInterceptors(Arrays.asList(interceptors));
}
mappings.add(httpHandler, path);
}

HandshakeHandler这个类也是可以自定义的,他与协议转换有关,spring会自动根据底层实现来决定这个是什么,一般不要自己写。

HttpRequestHandle处理过程如下,先调用拦截器的before,再掉用HandshakeHandler做协议转换,再调用拦截器的after。这里调用拦截器前创建的Map attributes,最终会设置到WebsocketSession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public void handleRequest(HttpServletRequest servletRequest, HttpServletResponse servletResponse)
throws ServletException, IOException {

ServerHttpRequest request = new ServletServerHttpRequest(servletRequest);
ServerHttpResponse response = new ServletServerHttpResponse(servletResponse);

HandshakeInterceptorChain chain = new HandshakeInterceptorChain(this.interceptors, this.wsHandler);
HandshakeFailureException failure = null;

try {
if (logger.isDebugEnabled()) {
logger.debug(servletRequest.getMethod() + " " + servletRequest.getRequestURI());
}
Map<String, Object> attributes = new HashMap<>();
//调用拦截器
if (!chain.applyBeforeHandshake(request, response, attributes)) {
return;
}
//处理握手
this.handshakeHandler.doHandshake(request, response, this.wsHandler, attributes);
//再调用拦截器
chain.applyAfterHandshake(request, response, null);
}
catch (HandshakeFailureException ex) {
failure = ex;
}
catch (Throwable ex) {
failure = new HandshakeFailureException("Uncaught failure for request " + request.getURI(), ex);
}
finally {
if (failure != null) {
chain.applyAfterHandshake(request, response, failure);
response.close();
throw failure;
}
response.close();
}
}

下面是协议升级前做的操作,创建StandardWebSocketSession时将attr也就是那个拦截器里的map设置进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public void upgrade(ServerHttpRequest request, ServerHttpResponse response,
@Nullable String selectedProtocol, List<WebSocketExtension> selectedExtensions,
@Nullable Principal user, WebSocketHandler wsHandler, Map<String, Object> attrs)
throws HandshakeFailureException {

HttpHeaders headers = request.getHeaders();
InetSocketAddress localAddr = null;
try {
localAddr = request.getLocalAddress();
}
catch (Exception ex) {
// Ignore
}
InetSocketAddress remoteAddr = null;
try {
remoteAddr = request.getRemoteAddress();
}
catch (Exception ex) {
// Ignore
}
//创建此类时将attrs作为参数传进去
StandardWebSocketSession session = new StandardWebSocketSession(headers, attrs, localAddr, remoteAddr, user);
StandardWebSocketHandlerAdapter endpoint = new StandardWebSocketHandlerAdapter(wsHandler, session);

List<Extension> extensions = new ArrayList<>();
for (WebSocketExtension extension : selectedExtensions) {
extensions.add(new WebSocketToStandardExtensionAdapter(extension));
}

upgradeInternal(request, response, selectedProtocol, extensions, endpoint);
}

Springmvc处理过程

​ 在DispatcherServlet里面,websocket升级的请求是由HttpRequestHandlerAdapter处理的,这个类是在

org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport#httpRequestHandlerAdapter方法里注册进来的。

处理过程也很简单,直接强转调用。

1
2
3
4
5
6
7
8
@Override
@Nullable
public ModelAndView handle(HttpServletRequest request, HttpServletResponse response, Object handler)
throws Exception {

((HttpRequestHandler) handler).handleRequest(request, response);
return null;
}


SpringBoot使用Websocket总结
https://www.huangchaoyu.com/3087924092.html
作者
hcy
发布于
2020年6月15日
更新于
2024年8月17日
许可协议