SpringBoot使用Websocket总结

Posted by hcy on June 15, 2020

SpringBoot使用Websocket总结

1.添加依赖

1
2
3
4
      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

2.写处理器

写一个类实现org.springframework.web.socket.WebSocketHandler

更推荐继承org.springframework.web.socket.handler.AbstractWebSocketHandler类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class TestHandler extends AbstractWebSocketHandler {

    //处理textmessage
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        super.handleTextMessage(session, message);
    }

    //处理binarymessage
    @Override
    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
        super.handleBinaryMessage(session, message);
    }

    //处理pong
    @Override
    protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
        super.handlePongMessage(session, message);
    }

    //报错时
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        super.handleTransportError(session, exception);
    }

    //连接关闭时
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        super.afterConnectionClosed(session, status);
    }

    //连接成功时
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        super.afterConnectionEstablished(session);
    }

    //是否支持报文拆包
    @Override
    public boolean supportsPartialMessages() {
        return super.supportsPartialMessages();
    }
}

3.配置路径和拦截器

​ 在一个Configuration类上,添加EnableWebSocket注解启动Websocket

​ 注入WebSocketConfigurerBean即可启动websocket服务端。

addInterceptors方法是添加拦截器,这个拦截器拦截在http协议转向websocket的那个请求上,如果底层使用的是servlet可以强转成ServletServerHttpRequest,在这里能拿到websocket连接时携带的paramhead

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@EnableWebSocket
@Configuration
public class WebsocketConfig {

    private static Logger log = LoggerFactory.getLogger(WebsocketConfig.class);

    @Bean
    public WebSocketConfigurer wsc() {
        return registry -> registry.addHandler(new TestHandler(), "/websocket/test")
                .addInterceptors(new HandshakeInterceptor() {
                    @Override
                    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
                        ServletServerHttpRequest req = (ServletServerHttpRequest) request;
                        return true;
                    }

                    @Override
                    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {

                    }
                }).setAllowedOrigins("*");
    }


}

4.技巧

  1. 拦截器返回false,则不会进行websocket协议的转换,如必须要登录才能连接的情况,可以在拦截器里拿到请求session_id来判断,也可以获取参数判断。

  2. 拦截器里的第三个参数,是一个map,放入此map的值可以从WebSocketHandler里面的WebSocketSession身上拿出来。

  3. supportsPartialMessages回调方法是决定是否接受半包,因为websocket协议比较底层,好像Tcp协议一样,如果发送大消息可能会拆成多个小报文。如果不希望处理不完整的报文,希望底层帮忙聚合成完整消息将此方法返回false,这样底层会等待完整报文到达聚合后才回调。

  4. WebSocketSession身上能设置最大报文大小,如果报文过大则会报错的,可以将限制调大。可以在afterConnectionEstablished方法里对WebSocketSession进行设置。tomcatsession默认大小就是 8*1024

    1
    2
    3
    4
    5
    
     @Override
        public void afterConnectionEstablished(WebSocketSession session) throws Exception {
            session.setBinaryMessageSizeLimit(8 * 1024);
            session.setTextMessageSizeLimit(8 * 1024);
        }
    
    1. WebSocketSession是不支持并发发送消息的,要么自己做同步,要么包装WebSocketSessionConcurrentWebSocketSessionDecorator,关于ConcurrentWebSocketSessionDecorator的使用可以参考Spring并发发送Websocket消息

    2. setAllowedOrigins("*")为控制跨域的,测试阶段可以设成*,不然不允许跨域链接Websocket

    3. 百度搜在线websocket,有在线连接Websocket的网站,这样后端自己做测试比较方便。

5.原理

EnableWebSocket引入了DelegatingWebSocketConfiguration

1
2
3
4
5
6
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(DelegatingWebSocketConfiguration.class)
public @interface EnableWebSocket {
}

先看DelegatingWebSocketConfiguration这个类。首先类被构造后,会自动注入WebSocketConfigurer,这个就是我们配置的Bean

1
2
3
4
5
6
@Autowired(required = false)
	public void setConfigurers(List<WebSocketConfigurer> configurers) {
		if (!CollectionUtils.isEmpty(configurers)) {
			this.configurers.addAll(configurers);
		}
	}

然后父类会注入一个HandlerMapping,会创建一个ServletWebSocketHandlerRegistry,调用所有的

configurer.registerWebSocketHandlers(registry);方法进行配置。

最后从registry.getHandlerMapping()获取HandlerMapping,这个HandlerMapping最终会注册到Springmvc里,接受到请求时会使用该HandlerMapping来处理请求。

1
2
3
4
5
6
7
8
9
10
11
	@Bean
	public HandlerMapping webSocketHandlerMapping() {
		ServletWebSocketHandlerRegistry registry = initHandlerRegistry();
		if (registry.requiresTaskScheduler()) {
			TaskScheduler scheduler = defaultSockJsTaskScheduler();
			Assert.notNull(scheduler, "Expected default TaskScheduler bean");
			registry.setTaskScheduler(scheduler);
		}
        //使用我们的配置,构造出HandlerMapping
		return registry.getHandlerMapping();
	}
1
2
3
4
5
6
7
	private ServletWebSocketHandlerRegistry initHandlerRegistry() {
		if (this.handlerRegistry == null) {
			this.handlerRegistry = new ServletWebSocketHandlerRegistry();
			registerWebSocketHandlers(this.handlerRegistry);
		}
		return this.handlerRegistry;
	}
1
2
3
4
5
6
	@Override
	protected void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
		for (WebSocketConfigurer configurer : this.configurers) {
			configurer.registerWebSocketHandlers(registry);
		}
	}

下面是获取HandlerMapping的过程

​ 这里会创建一个WebSocketHandlerMapping,并设置urlMapurlMap里面存放的是url 和对应的HttpRequestHandle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
	public AbstractHandlerMapping getHandlerMapping() {
		Map<String, Object> urlMap = new LinkedHashMap<>();
		for (ServletWebSocketHandlerRegistration registration : this.registrations) {
			MultiValueMap<HttpRequestHandler, String> mappings = registration.getMappings();
			mappings.forEach((httpHandler, patterns) -> {
				for (String pattern : patterns) {
					urlMap.put(pattern, httpHandler);
				}
			});
		}
        
		WebSocketHandlerMapping hm = new WebSocketHandlerMapping();
		hm.setUrlMap(urlMap);
		hm.setOrder(this.order);
		if (this.urlPathHelper != null) {
			hm.setUrlPathHelper(this.urlPathHelper);
		}
		return hm;
	}	

这里是将我们的配置构建成一个WebSocketHttpRequestHandler来处理请求,其中包含了路径,拦截器,WebSocketHandler,HandshakeInterceptor,HandshakeHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
	@Override
	protected void addWebSocketHandlerMapping(MultiValueMap<HttpRequestHandler, String> mappings,
			WebSocketHandler webSocketHandler, HandshakeHandler handshakeHandler,
			HandshakeInterceptor[] interceptors, String path) {

		WebSocketHttpRequestHandler httpHandler =
				new WebSocketHttpRequestHandler(webSocketHandler, handshakeHandler);

		if (!ObjectUtils.isEmpty(interceptors)) {
			httpHandler.setHandshakeInterceptors(Arrays.asList(interceptors));
		}
		mappings.add(httpHandler, path);
	}

HandshakeHandler这个类也是可以自定义的,他与协议转换有关,spring会自动根据底层实现来决定这个是什么,一般不要自己写。

HttpRequestHandle处理过程如下,先调用拦截器的before,再掉用HandshakeHandler做协议转换,再调用拦截器的after。这里调用拦截器前创建的Map attributes,最终会设置到WebsocketSession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
	public void handleRequest(HttpServletRequest servletRequest, HttpServletResponse servletResponse)
			throws ServletException, IOException {

		ServerHttpRequest request = new ServletServerHttpRequest(servletRequest);
		ServerHttpResponse response = new ServletServerHttpResponse(servletResponse);

		HandshakeInterceptorChain chain = new HandshakeInterceptorChain(this.interceptors, this.wsHandler);
		HandshakeFailureException failure = null;

		try {
			if (logger.isDebugEnabled()) {
				logger.debug(servletRequest.getMethod() + " " + servletRequest.getRequestURI());
			}
			Map<String, Object> attributes = new HashMap<>();
            //调用拦截器
			if (!chain.applyBeforeHandshake(request, response, attributes)) {
				return;
			}
            //处理握手
			this.handshakeHandler.doHandshake(request, response, this.wsHandler, attributes);
            //再调用拦截器
			chain.applyAfterHandshake(request, response, null);
		}
		catch (HandshakeFailureException ex) {
			failure = ex;
		}
		catch (Throwable ex) {
			failure = new HandshakeFailureException("Uncaught failure for request " + request.getURI(), ex);
		}
		finally {
			if (failure != null) {
				chain.applyAfterHandshake(request, response, failure);
				response.close();
				throw failure;
			}
			response.close();
		}
	}

下面是协议升级前做的操作,创建StandardWebSocketSession时将attr也就是那个拦截器里的map设置进去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
	public void upgrade(ServerHttpRequest request, ServerHttpResponse response,
			@Nullable String selectedProtocol, List<WebSocketExtension> selectedExtensions,
			@Nullable Principal user, WebSocketHandler wsHandler, Map<String, Object> attrs)
			throws HandshakeFailureException {

		HttpHeaders headers = request.getHeaders();
		InetSocketAddress localAddr = null;
		try {
			localAddr = request.getLocalAddress();
		}
		catch (Exception ex) {
			// Ignore
		}
		InetSocketAddress remoteAddr = null;
		try {
			remoteAddr = request.getRemoteAddress();
		}
		catch (Exception ex) {
			// Ignore
		}
		//创建此类时将attrs作为参数传进去
		StandardWebSocketSession session = new StandardWebSocketSession(headers, attrs, localAddr, remoteAddr, user);
		StandardWebSocketHandlerAdapter endpoint = new StandardWebSocketHandlerAdapter(wsHandler, session);

		List<Extension> extensions = new ArrayList<>();
		for (WebSocketExtension extension : selectedExtensions) {
			extensions.add(new WebSocketToStandardExtensionAdapter(extension));
		}

		upgradeInternal(request, response, selectedProtocol, extensions, endpoint);
	}

Springmvc处理过程

​ 在DispatcherServlet里面,websocket升级的请求是由HttpRequestHandlerAdapter处理的,这个类是在

org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport#httpRequestHandlerAdapter方法里注册进来的。

处理过程也很简单,直接强转调用。

1
2
3
4
5
6
7
8
	@Override
	@Nullable
	public ModelAndView handle(HttpServletRequest request, HttpServletResponse response, Object handler)
			throws Exception {

		((HttpRequestHandler) handler).handleRequest(request, response);
		return null;
	}


转载请注明出处:https://www.huangchaoyu.com/2020/06/15/SpringBoot使用Websocket总结/