SpringBoot使用Websocket总结
1.添加依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency>
|
2.写处理器
写一个类实现org.springframework.web.socket.WebSocketHandler
,
更推荐继承org.springframework.web.socket.handler.AbstractWebSocketHandler
类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class TestHandler extends AbstractWebSocketHandler {
@Override protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception { super.handleTextMessage(session, message); }
@Override protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception { super.handleBinaryMessage(session, message); }
@Override protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception { super.handlePongMessage(session, message); }
@Override public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception { super.handleTransportError(session, exception); }
@Override public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception { super.afterConnectionClosed(session, status); }
@Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { super.afterConnectionEstablished(session); }
@Override public boolean supportsPartialMessages() { return super.supportsPartialMessages(); } }
|
3.配置路径和拦截器
在一个Configuration
类上,添加EnableWebSocket
注解启动Websocket
。
注入WebSocketConfigurer
的Bean
即可启动websocket
服务端。
addInterceptors
方法是添加拦截器,这个拦截器拦截在http
协议转向websocket
的那个请求上,如果底层使用的是servlet
可以强转成ServletServerHttpRequest
,在这里能拿到websocket
连接时携带的param
和head
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @EnableWebSocket @Configuration public class WebsocketConfig {
private static Logger log = LoggerFactory.getLogger(WebsocketConfig.class);
@Bean public WebSocketConfigurer wsc() { return registry -> registry.addHandler(new TestHandler(), "/websocket/test") .addInterceptors(new HandshakeInterceptor() { @Override public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception { ServletServerHttpRequest req = (ServletServerHttpRequest) request; return true; }
@Override public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
} }).setAllowedOrigins("*"); }
}
|
4.技巧
拦截器返回false
,则不会进行websocket
协议的转换,如必须要登录才能连接的情况,可以在拦截器里拿到请求session_id
来判断,也可以获取参数判断。
拦截器里的第三个参数,是一个map
,放入此map
的值可以从WebSocketHandler
里面的WebSocketSession
身上拿出来。
supportsPartialMessages
回调方法是决定是否接受半包,因为websocket
协议比较底层,好像Tcp
协议一样,如果发送大消息可能会拆成多个小报文。如果不希望处理不完整的报文,希望底层帮忙聚合成完整消息将此方法返回false
,这样底层会等待完整报文到达聚合后才回调。
WebSocketSession
身上能设置最大报文大小,如果报文过大则会报错的,可以将限制调大。可以在afterConnectionEstablished
方法里对WebSocketSession
进行设置。tomcat
的session
默认大小就是 8*1024
。
1 2 3 4 5
| @Override public void afterConnectionEstablished(WebSocketSession session) throws Exception { session.setBinaryMessageSizeLimit(8 * 1024); session.setTextMessageSizeLimit(8 * 1024); }
|
WebSocketSession
是不支持并发发送消息的,要么自己做同步,要么包装WebSocketSession
为ConcurrentWebSocketSessionDecorator
,关于ConcurrentWebSocketSessionDecorator
的使用可以参考Spring并发发送Websocket消息。
setAllowedOrigins("*")
为控制跨域的,测试阶段可以设成*
,不然不允许跨域链接Websocket
。
百度搜在线websocket
,有在线连接Websocket
的网站,这样后端自己做测试比较方便。
5.原理
EnableWebSocket
引入了DelegatingWebSocketConfiguration
。
1 2 3 4 5 6
| @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(DelegatingWebSocketConfiguration.class) public @interface EnableWebSocket { }
|
先看DelegatingWebSocketConfiguration
这个类。首先类被构造后,会自动注入WebSocketConfigurer
,这个就是我们配置的Bean
。
1 2 3 4 5 6
| @Autowired(required = false) public void setConfigurers(List<WebSocketConfigurer> configurers) { if (!CollectionUtils.isEmpty(configurers)) { this.configurers.addAll(configurers); } }
|
然后父类会注入一个HandlerMapping
,会创建一个ServletWebSocketHandlerRegistry
,调用所有的
configurer.registerWebSocketHandlers(registry);
方法进行配置。
最后从registry.getHandlerMapping()
获取HandlerMapping
,这个HandlerMapping
最终会注册到Springmvc
里,接受到请求时会使用该HandlerMapping
来处理请求。
1 2 3 4 5 6 7 8 9 10 11
| @Bean public HandlerMapping webSocketHandlerMapping() { ServletWebSocketHandlerRegistry registry = initHandlerRegistry(); if (registry.requiresTaskScheduler()) { TaskScheduler scheduler = defaultSockJsTaskScheduler(); Assert.notNull(scheduler, "Expected default TaskScheduler bean"); registry.setTaskScheduler(scheduler); } return registry.getHandlerMapping(); }
|
1 2 3 4 5 6 7
| private ServletWebSocketHandlerRegistry initHandlerRegistry() { if (this.handlerRegistry == null) { this.handlerRegistry = new ServletWebSocketHandlerRegistry(); registerWebSocketHandlers(this.handlerRegistry); } return this.handlerRegistry; }
|
1 2 3 4 5 6
| @Override protected void registerWebSocketHandlers(WebSocketHandlerRegistry registry) { for (WebSocketConfigurer configurer : this.configurers) { configurer.registerWebSocketHandlers(registry); } }
|
下面是获取HandlerMapping
的过程
这里会创建一个WebSocketHandlerMapping
,并设置urlMap
,urlMap
里面存放的是url 和对应的HttpRequestHandle
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public AbstractHandlerMapping getHandlerMapping() { Map<String, Object> urlMap = new LinkedHashMap<>(); for (ServletWebSocketHandlerRegistration registration : this.registrations) { MultiValueMap<HttpRequestHandler, String> mappings = registration.getMappings(); mappings.forEach((httpHandler, patterns) -> { for (String pattern : patterns) { urlMap.put(pattern, httpHandler); } }); } WebSocketHandlerMapping hm = new WebSocketHandlerMapping(); hm.setUrlMap(urlMap); hm.setOrder(this.order); if (this.urlPathHelper != null) { hm.setUrlPathHelper(this.urlPathHelper); } return hm; }
|
这里是将我们的配置构建成一个WebSocketHttpRequestHandler
来处理请求,其中包含了路径,拦截器,WebSocketHandler,HandshakeInterceptor,HandshakeHandler
。
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Override protected void addWebSocketHandlerMapping(MultiValueMap<HttpRequestHandler, String> mappings, WebSocketHandler webSocketHandler, HandshakeHandler handshakeHandler, HandshakeInterceptor[] interceptors, String path) {
WebSocketHttpRequestHandler httpHandler = new WebSocketHttpRequestHandler(webSocketHandler, handshakeHandler);
if (!ObjectUtils.isEmpty(interceptors)) { httpHandler.setHandshakeInterceptors(Arrays.asList(interceptors)); } mappings.add(httpHandler, path); }
|
HandshakeHandler
这个类也是可以自定义的,他与协议转换有关,spring会自动根据底层实现来决定这个是什么,一般不要自己写。
HttpRequestHandle
处理过程如下,先调用拦截器的before,再掉用HandshakeHandler
做协议转换,再调用拦截器的after。这里调用拦截器前创建的Map attributes
,最终会设置到WebsocketSession
上
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Override public void handleRequest(HttpServletRequest servletRequest, HttpServletResponse servletResponse) throws ServletException, IOException {
ServerHttpRequest request = new ServletServerHttpRequest(servletRequest); ServerHttpResponse response = new ServletServerHttpResponse(servletResponse);
HandshakeInterceptorChain chain = new HandshakeInterceptorChain(this.interceptors, this.wsHandler); HandshakeFailureException failure = null;
try { if (logger.isDebugEnabled()) { logger.debug(servletRequest.getMethod() + " " + servletRequest.getRequestURI()); } Map<String, Object> attributes = new HashMap<>(); if (!chain.applyBeforeHandshake(request, response, attributes)) { return; } this.handshakeHandler.doHandshake(request, response, this.wsHandler, attributes); chain.applyAfterHandshake(request, response, null); } catch (HandshakeFailureException ex) { failure = ex; } catch (Throwable ex) { failure = new HandshakeFailureException("Uncaught failure for request " + request.getURI(), ex); } finally { if (failure != null) { chain.applyAfterHandshake(request, response, failure); response.close(); throw failure; } response.close(); } }
|
下面是协议升级前做的操作,创建StandardWebSocketSession
时将attr
也就是那个拦截器里的map
设置进去。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Override public void upgrade(ServerHttpRequest request, ServerHttpResponse response, @Nullable String selectedProtocol, List<WebSocketExtension> selectedExtensions, @Nullable Principal user, WebSocketHandler wsHandler, Map<String, Object> attrs) throws HandshakeFailureException {
HttpHeaders headers = request.getHeaders(); InetSocketAddress localAddr = null; try { localAddr = request.getLocalAddress(); } catch (Exception ex) { } InetSocketAddress remoteAddr = null; try { remoteAddr = request.getRemoteAddress(); } catch (Exception ex) { } StandardWebSocketSession session = new StandardWebSocketSession(headers, attrs, localAddr, remoteAddr, user); StandardWebSocketHandlerAdapter endpoint = new StandardWebSocketHandlerAdapter(wsHandler, session);
List<Extension> extensions = new ArrayList<>(); for (WebSocketExtension extension : selectedExtensions) { extensions.add(new WebSocketToStandardExtensionAdapter(extension)); }
upgradeInternal(request, response, selectedProtocol, extensions, endpoint); }
|
Springmvc处理过程
在DispatcherServlet
里面,websocket
升级的请求是由HttpRequestHandlerAdapter
处理的,这个类是在
org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport#httpRequestHandlerAdapter
方法里注册进来的。
处理过程也很简单,直接强转调用。
1 2 3 4 5 6 7 8
| @Override @Nullable public ModelAndView handle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
((HttpRequestHandler) handler).handleRequest(request, response); return null; }
|
完