Spring整合websocket整合应用示例(下)
在Spring整合websocket整合应用示例(上)文章中,我们已经实现了websocket,但还有一个核心的业务实现类没有实现,这里我们就实现这个业务核心类,因为老夫参与的这个系统使用websocket发送消息,所以其实现就是如何发送消息了。
7.NewsListenerImpl的实现
packagecn.bridgeli.websocket; importcom.google.gson.Gson; importcom.google.gson.GsonBuilder; importcom.lagou.common.base.util.date.DateUtil; importcom.lagou.platform.news.api.enumeration.PlatNewsCategoryType; importcom.lagou.platform.news.web.dao.ext.model.PlatNewsVo; importcom.lagou.platform.news.web.dao.ext.model.SearchCondition; importcom.lagou.platform.news.web.quartz.impl.TimingJob; importcom.lagou.platform.news.web.service.PlatNewsService; importorg.apache.commons.lang.StringUtils; importorg.json.simple.JSONArray; importorg.json.simple.JSONObject; importorg.quartz.*; importorg.quartz.impl.StdSchedulerFactory; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importorg.springframework.beans.factory.annotation.Autowired; importorg.springframework.stereotype.Component; importorg.springframework.web.socket.TextMessage; importjava.io.IOException; importjava.util.Date; importjava.util.List; importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; /** *@Description:站内消息监听器实现 *@Date:16-3-7 */ @Component publicclassNewsListenerImplimplementsNewsListener{ privatestaticfinalLoggerlogger=LoggerFactory.getLogger(NewsListenerImpl.class); Gsongson=newGsonBuilder().setDateFormat("yyyy-MM-ddHH:mm:ss").create(); //线程池 privateExecutorServiceexecutorService=Executors.newCachedThreadPool(); //任务调度 privateSchedulerFactorysf=newStdSchedulerFactory(); @Autowired privatePlatNewsServiceplatNewsService; @Override publicvoidafterPersist(PlatNewsVoplatNewsVo){ logger.info("监听到有新消息添加。。。"); logger.info("新消息为:"+gson.toJson(platNewsVo)); //启动线程 if(null!=platNewsVo&&!StringUtils.isBlank(platNewsVo.getCurrentoperatoremail())){ //如果是定时消息 if(platNewsVo.getNewsType()==PlatNewsCategoryType.TIMING_TIME.getCategoryId()){ startTimingTask(platNewsVo);//定时推送 }else{ //立即推送 executorService.execute(newAfterConnectionEstablishedTask(platNewsVo.getCurrentoperatoremail())); } } } @Override publicvoidafterConnectionEstablished(Stringemail){ logger.info("建立websocket连接后推送新消息。。。"); if(!StringUtils.isBlank(email)){ executorService.execute(newAfterConnectionEstablishedTask(email)); } } /** *@Description:如果新添加了定时消息,启动定时消息任务 *@paramplatNewsVo */ privatevoidstartTimingTask(PlatNewsVoplatNewsVo){ logger.info("开始定时推送消息任务。。。"); DatetimingTime=platNewsVo.getTimingTime(); if(null==timingTime){ logger.info("定时消息时间为null。"); return; } logger.info("定时推送任务时间为:"+DateUtil.date2String(timingTime)); JobDetailjobDetail=JobBuilder.newJob(TimingJob.class) .withIdentity(platNewsVo.getCurrentoperatoremail()+"定时消息"+platNewsVo.getId(),"站内消息") .build(); //传递参数 jobDetail.getJobDataMap().put("platNewsService",platNewsService); jobDetail.getJobDataMap().put("userEmail",platNewsVo.getCurrentoperatoremail()); Triggertrigger=TriggerBuilder .newTrigger() .withIdentity("定时消息触发"+platNewsVo.getId(),"站内消息") .startAt(timingTime) .withSchedule(SimpleScheduleBuilder.simpleSchedule() .withIntervalInSeconds(0)//时间间隔 .withRepeatCount(0)//重复次数 ) .build(); //启动定时任务 try{ Schedulersched=sf.getScheduler(); sched.scheduleJob(jobDetail,trigger); if(!sched.isShutdown()){ sched.start(); } }catch(SchedulerExceptione){ logger.info(e.toString()); } logger.info("完成开启定时推送消息任务。。。"); } /** *@Description:建立websocket链接后的推送线程 */ classAfterConnectionEstablishedTaskimplementsRunnable{ Stringemail; publicAfterConnectionEstablishedTask(Stringemail){ this.email=email; } @Override publicvoidrun(){ logger.info("开始推送消息给用户:"+email+"。。。"); if(!StringUtils.isBlank(email)){ SearchConditionsearchCondition=newSearchCondition(); searchCondition.setOperatorEmail(email); JSONArrayjsonArray=newJSONArray(); for(PlatNewsCategoryTypetype:PlatNewsCategoryType.values()){ searchCondition.setTypeId(type.getCategoryId()); intcount=platNewsService.countPlatNewsByExample(searchCondition); JSONObjectobject=newJSONObject(); object.put("name",type.name()); object.put("description",type.getDescription()); object.put("count",count); jsonArray.add(object); } if(null!=jsonArray&&jsonArray.size()>0){ UserSocketVouserSocketVo=WSSessionLocalCache.get(email); TextMessagereMessage=newTextMessage(gson.toJson(jsonArray)); try{ if(null!=userSocketVo){ //推送消息 userSocketVo.getWebSocketSession().sendMessage(reMessage); //更新推送时间 userSocketVo.setLastSendTime(DateUtil.getNowDate()); logger.info("完成推送新消息给用户:"+userSocketVo.getUserEmail()+"。。。"); } }catch(IOExceptione){ logger.error(e.toString()); logger.info("站内消息推送失败。。。"+e.toString()); } } } logger.info("结束推送消息给"+email+"。。。"); } } }
这个类就是websocket的核心业务的实现,其具体肯定和业务相关,由于业务的不同,实现肯定不同,因为老夫参与的系统是发送消息,所以里面最核心的一句就是:
userSocketVo.getWebSocketSession().sendMessage(reMessage);
通过WebSocketSession的sendMessage方法把我们的消息发送出去。另外,这主要是后端的实现,至于前端的实现,因为老夫是后端程序猿比较关注后端,所以前端就不多做介绍了,大家可以自己去网上查资料。最后需要说明的是,老夫之前搜一些学习资料的时候,发现老夫该同事的写法和有一篇文章几乎一样,我想该同事应该是参考了这篇文章,所以列在下面,算作参考资料。