Spring和Websocket相结合实现消息的推送
本文主要有三个步骤
1、用户登录后建立websocket连接,默认选择websocket连接,如果浏览器不支持,则使用sockjs进行模拟连接
2、建立连接后,服务端返回该用户的未读消息
3、服务端进行相关操作后,推送给某一个用户或者所有用户新消息相关环境Spring4.0.6(要选择4.0+),tomcat7.0.55
Websocet服务端实现
WebSocketConfig.java
@Configuration @EnableWebMvc @EnableWebSocket publicclassWebSocketConfigextendsWebMvcConfigurerAdapterimplementsWebSocketConfigurer{ @Override publicvoidregisterWebSocketHandlers(WebSocketHandlerRegistryregistry){ registry.addHandler(systemWebSocketHandler(),"/webSocketServer").addInterceptors(newWebSocketHandshakeInterceptor()); registry.addHandler(systemWebSocketHandler(),"/sockjs/webSocketServer").addInterceptors(newWebSocketHandshakeInterceptor()) .withSockJS(); } @Bean publicWebSocketHandlersystemWebSocketHandler(){ returnnewSystemWebSocketHandler(); } }
不要忘记在springmvc的配置文件中配置对此类的自动扫描
<context:component-scanbase-package="com.ldl.origami.websocket"/>
@Configuration
@EnableWebMvc
@EnableWebSocket
这三个大致意思是使这个类支持以@Bean的方式加载bean,并且支持springmvc和websocket,不是很准确大致这样,试了一下@EnableWebMvc不加也没什么影响,@Configuration本来就支持springmvc的自动扫描
registry.addHandler(systemWebSocketHandler(),"/webSocketServer").addInterceptors(newWebSocketHandshakeInterceptor())
用来注册websocketserver实现类,第二个参数是访问websocket的地址
registry.addHandler(systemWebSocketHandler(),"/sockjs/webSocketServer").addInterceptors(newWebSocketHandshakeInterceptor()) .withSockJS(); }
这个是使用Sockjs的注册方法
首先SystemWebSocketHandler.java
publicclassSystemWebSocketHandlerimplementsWebSocketHandler{ privatestaticfinalLoggerlogger; privatestaticfinalArrayList<WebSocketSession>users; static{ users=newArrayList<>(); logger=LoggerFactory.getLogger(SystemWebSocketHandler.class); } @Autowired privateWebSocketServicewebSocketService; @Override publicvoidafterConnectionEstablished(WebSocketSessionsession)throwsException{ logger.debug("connecttothewebsocketsuccess......"); users.add(session); StringuserName=(String)session.getAttributes().get(Constants.WEBSOCKET_USERNAME); if(userName!=null){ //查询未读消息 intcount=webSocketService.getUnReadNews((String)session.getAttributes().get(Constants.WEBSOCKET_USERNAME)); session.sendMessage(newTextMessage(count+"")); } } @Override publicvoidhandleMessage(WebSocketSessionsession,WebSocketMessage<?>message)throwsException{ //sendMessageToUsers(); } @Override publicvoidhandleTransportError(WebSocketSessionsession,Throwableexception)throwsException{ if(session.isOpen()){ session.close(); } logger.debug("websocketconnectionclosed......"); users.remove(session); } @Override publicvoidafterConnectionClosed(WebSocketSessionsession,CloseStatuscloseStatus)throwsException{ logger.debug("websocketconnectionclosed......"); users.remove(session); } @Override publicbooleansupportsPartialMessages(){ returnfalse; } /** *给所有在线用户发送消息 * *@parammessage */ publicvoidsendMessageToUsers(TextMessagemessage){ for(WebSocketSessionuser:users){ try{ if(user.isOpen()){ user.sendMessage(message); } }catch(IOExceptione){ e.printStackTrace(); } } } /** *给某个用户发送消息 * *@paramuserName *@parammessage */ publicvoidsendMessageToUser(StringuserName,TextMessagemessage){ for(WebSocketSessionuser:users){ if(user.getAttributes().get(Constants.WEBSOCKET_USERNAME).equals(userName)){ try{ if(user.isOpen()){ user.sendMessage(message); } }catch(IOExceptione){ e.printStackTrace(); } break; } } } }
相关内容大家一看就能明白,就不多解释了
然后WebSocketHandshakeInterceptor.java
publicclassWebSocketHandshakeInterceptorimplementsHandshakeInterceptor{ privatestaticLoggerlogger=LoggerFactory.getLogger(HandshakeInterceptor.class); @Override publicbooleanbeforeHandshake(ServerHttpRequestrequest,ServerHttpResponseresponse,WebSocketHandlerwsHandler,Map<String,Object >attributes)throwsException{ if(requestinstanceofServletServerHttpRequest){ ServletServerHttpRequestservletRequest=(ServletServerHttpRequest)request; HttpSessionsession=servletRequest.getServletRequest().getSession(false); if(session!=null){ //使用userName区分WebSocketHandler,以便定向发送消息 StringuserName=(String)session.getAttribute(Constants.SESSION_USERNAME); attributes.put(Constants.WEBSOCKET_USERNAME,userName); } } returntrue; } @Override publicvoidafterHandshake(ServerHttpRequestrequest,ServerHttpResponseresponse,WebSocketHandlerwsHandler,Exceptionexception){ } }
这个的主要作用是取得当前请求中的用户名,并且保存到当前的WebSocketHandler中,以便确定WebSocketHandler所对应的用户,具体可参考HttpSessionHandshakeInterceptor
用户登录建立websocket连接
index.jsp
<scripttype="text/javascript"src="http://localhost:8080/Origami/websocket/sockjs-0.3.min.js"></script> <script> varwebsocket; if('WebSocket'inwindow){ websocket=newWebSocket("ws://localhost:8080/Origami/webSocketServer"); }elseif('MozWebSocket'inwindow){ websocket=newMozWebSocket("ws://localhost:8080/Origami/webSocketServer"); }else{ websocket=newSockJS("http://localhost:8080/Origami/sockjs/webSocketServer"); } websocket.onopen=function(evnt){ }; websocket.onmessage=function(evnt){ $("#msgcount").html("(<fontcolor='red'>"+evnt.data+"</font>)") }; websocket.onerror=function(evnt){ }; websocket.onclose=function(evnt){ } </script>
使用sockjs时要注意
1、这两个的写法
<scripttype="text/javascript"src="http://localhost:8080/Origami/websocket/sockjs-0.3.min.js"></script> websocket=newSockJS(http://localhost:8080/Origami/sockjs/webSocketServer);
2、web.xml中
<web-appversion="3.0"xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaeehttp://java.sun.com/xml/ns/javaee/web-app_3_1.xsd">
version
web-app_3_1.xsd
这两个的版本都要是3.0+
然后在这个servlet中加入
<async-supported>true</async-supported> <servlet> <servlet-name>appServlet</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> <init-param> <param-name>contextConfigLocation</param-name> <param-value>classpath*:servlet-context.xml</param-value> </init-param> <load-on-startup>1</load-on-startup> <async-supported>true</async-supported> </servlet>
然后所有的filter中也加入
<async-supported>true</async-supported>
3、添加相关依赖
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.3.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.3.3</version> </dependency>
好了,现在websocket可以正常建立起来了
返回用户未读的消息
当连接建立后,会进入SystemWebSocketHandler的afterConnectionEstablished方法,代码看上边,取出WebSocketHandshakeInterceptor中保存的用户名
查询信息后使用session.sendMessage(newTextMessage(count+""));返回给用户,从哪来回哪去
服务端推送消息给用户
@Controller publicclassAdminController{ staticLoggerlogger=LoggerFactory.getLogger(AdminController.class); @Autowired(required=false) privateAdminServiceadminService; @Bean publicSystemWebSocketHandlersystemWebSocketHandler(){ returnnewSystemWebSocketHandler(); } @RequestMapping("/auditing") @ResponseBody publicStringauditing(HttpServletRequestrequest){ //无关代码都省略了 intunReadNewsCount=adminService.getUnReadNews(username); systemWebSocketHandler().sendMessageToUser(username,newTextMessage(unReadNewsCount+"")); returnresult; } }
在这里可以使用sendMessageToUser给某个用户推送信息,也可以使用sendMessageToUsers给所有用户推送信息