SpringBoot+Netty+WebSocket实现消息发送的示例代码
一.导入Netty依赖
io.netty netty-all 4.1.25.Final
二.搭建websocket服务器
@Component
publicclassWebSocketServer{
/**
*主线程池
*/
privateEventLoopGroupbossGroup;
/**
*工作线程池
*/
privateEventLoopGroupworkerGroup;
/**
*服务器
*/
privateServerBootstrapserver;
/**
*回调
*/
privateChannelFuturefuture;
publicvoidstart(){
future=server.bind(9001);
System.out.println("nettyserver-启动成功");
}
publicWebSocketServer(){
bossGroup=newNioEventLoopGroup();
workerGroup=newNioEventLoopGroup();
server=newServerBootstrap();
server.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(newWebsocketInitializer());
}
}
三.初始化Websocket
publicclassWebsocketInitializerextendsChannelInitializer{ @Override protectedvoidinitChannel(SocketChannelch)throwsException{ ChannelPipelinepipeline=ch.pipeline(); //------------------ //用于支持Http协议 //------------------ //websocket基于http协议,需要有http的编解码器 pipeline.addLast(newHttpServerCodec()); //对写大数据流的支持 pipeline.addLast(newChunkedWriteHandler()); //添加对HTTP请求和响应的聚合器:只要使用Netty进行Http编程都需要使用 //设置单次请求的文件的大小 pipeline.addLast(newHttpObjectAggregator(1024*64)); //webSocket服务器处理的协议,用于指定给客户端连接访问的路由:/ws pipeline.addLast(newWebSocketServerProtocolHandler("/ws")); //添加Netty空闲超时检查的支持 //1.读空闲超时(超过一定的时间会发送对应的事件消息) //2.写空闲超时 //3.读写空闲超时 pipeline.addLast(newIdleStateHandler(4,8,12)); //添加心跳处理 pipeline.addLast(newHearBeatHandler()); //添加自定义的handler pipeline.addLast(newChatHandler()); } }
四.创建Netty监听器
@Component publicclassNettyListenerimplementsApplicationListener{ @Resource privateWebSocketServerwebsocketServer; @Override publicvoidonApplicationEvent(ContextRefreshedEventevent){ if(event.getApplicationContext().getParent()==null){ try{ websocketServer.start(); }catch(Exceptione){ e.printStackTrace(); } } } }
五.建立消息通道
publicclassUserChannelMap{
/**
*用户保存用户id与通道的Map对象
*/
//privatestaticMapuserChannelMap;
/*static{
userChannelMap=newHashMap();
}*/
/**
*定义一个channel组,管理所有的channel
*GlobalEventExecutor.INSTANCE是全局的事件执行器,是一个单例
*/
privatestaticChannelGroupchannelGroup=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
*存放用户与Chanel的对应信息,用于给指定用户发送消息
*/
privatestaticConcurrentHashMapuserChannelMap=newConcurrentHashMap<>();
privateUserChannelMap(){}
/**
*添加用户id与channel的关联
*@paramuserNum
*@paramchannel
*/
publicstaticvoidput(StringuserNum,Channelchannel){
userChannelMap.put(userNum,channel);
}
/**
*根据用户id移除用户id与channel的关联
*@paramuserNum
*/
publicstaticvoidremove(StringuserNum){
userChannelMap.remove(userNum);
}
/**
*根据通道id移除用户与channel的关联
*@paramchannelId通道的id
*/
publicstaticvoidremoveByChannelId(StringchannelId){
if(!StringUtils.isNotBlank(channelId)){
return;
}
for(Strings:userChannelMap.keySet()){
Channelchannel=userChannelMap.get(s);
if(channelId.equals(channel.id().asLongText())){
System.out.println("客户端连接断开,取消用户"+s+"与通道"+channelId+"的关联");
userChannelMap.remove(s);
UserServiceuserService=SpringUtil.getBean(UserService.class);
userService.logout(s);
break;
}
}
}
/**
*打印所有的用户与通道的关联数据
*/
publicstaticvoidprint(){
for(Strings:userChannelMap.keySet()){
System.out.println("用户id:"+s+"通道:"+userChannelMap.get(s).id());
}
}
/**
*根据好友id获取对应的通道
*@paramreceiverNum接收人编号
*@returnNetty通道
*/
publicstaticChannelget(StringreceiverNum){
returnuserChannelMap.get(receiverNum);
}
/**
*获取channel组
*@return
*/
publicstaticChannelGroupgetChannelGroup(){
returnchannelGroup;
}
/**
*获取用户channelmap
*@return
*/
publicstaticConcurrentHashMapgetUserChannelMap(){
returnuserChannelMap;
}
}
六.自定义消息类型
publicclassMessage{
/**
*消息类型
*/
privateIntegertype;
/**
*聊天消息
*/
privateStringmessage;
/**
*扩展消息字段
*/
privateObjectext;
publicIntegergetType(){
returntype;
}
publicvoidsetType(Integertype){
this.type=type;
}
publicMarketChatRecordgetChatRecord(){
returnmarketChatRecord;
}
publicvoidsetChatRecord(MarketChatRecordchatRecord){
this.marketChatRecord=chatRecord;
}
publicObjectgetExt(){
returnext;
}
publicvoidsetExt(Objectext){
this.ext=ext;
}
@Override
publicStringtoString(){
return"Message{"+
"type="+type+
",marketChatRecord="+marketChatRecord+
",ext="+ext+
'}';
}
}
七.创建处理消息的handler
publicclassChatHandlerextendsSimpleChannelInboundHandler{ privatestaticfinalLoggerlog=LoggerFactory.getLogger(WebSocketServer.class); /** *用来保存所有的客户端连接 */ privatestaticChannelGroupclients=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** *当Channel中有新的事件消息会自动调用 */ @Override protectedvoidchannelRead0(ChannelHandlerContextctx,TextWebSocketFramemsg)throwsException{ //当接收到数据后会自动调用 //获取客户端发送过来的文本消息 Gsongson=newGson(); log.info("服务器收到消息:{}",msg.text()); System.out.println("接收到消息数据为:"+msg.text()); Messagemessage=gson.fromJson(msg.text(),Message.class); //根据业务要求进行消息处理 switch(message.getType()){ //处理客户端连接的消息 case0: //建立用户与通道的关联 //处理客户端发送好友消息 break; case1: //处理客户端的签收消息 break; case2: //将消息记录设置为已读 break; case3: //接收心跳消息 break; default: break; } } //当有新的客户端连接服务器之后,会自动调用这个方法 @Override publicvoidhandlerAdded(ChannelHandlerContextctx)throwsException{ log.info("handlerAdded被调用"+ctx.channel().id().asLongText()); //添加到channelGroup通道组 UserChannelMap.getChannelGroup().add(ctx.channel()); //clients.add(ctx.channel()); } @Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{ log.info("{异常:}"+cause.getMessage()); //删除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); ctx.channel().close(); } @Override publicvoidhandlerRemoved(ChannelHandlerContextctx)throwsException{ log.info("handlerRemoved被调用"+ctx.channel().id().asLongText()); //删除通道 UserChannelMap.getChannelGroup().remove(ctx.channel()); UserChannelMap.removeByChannelId(ctx.channel().id().asLongText()); UserChannelMap.print(); } }
八.处理心跳
publicclassHearBeatHandlerextendsChannelInboundHandlerAdapter{
@Override
publicvoiduserEventTriggered(ChannelHandlerContextctx,Objectevt)throwsException{
if(evtinstanceofIdleStateEvent){
IdleStateEventidleStateEvent=(IdleStateEvent)evt;
if(idleStateEvent.state()==IdleState.READER_IDLE){
System.out.println("读空闲事件触发...");
}
elseif(idleStateEvent.state()==IdleState.WRITER_IDLE){
System.out.println("写空闲事件触发...");
}
elseif(idleStateEvent.state()==IdleState.ALL_IDLE){
System.out.println("---------------");
System.out.println("读写空闲事件触发");
System.out.println("关闭通道资源");
ctx.channel().close();
}
}
}
}
搭建完成后调用测试
1.页面访问http://localhost:9001/ws
2.端口号9001和访问路径ws都是我们在上边配置的,然后传入我们自定义的消息message类型。
3.大概流程:消息发送:用户1先连接通道,然后发送消息给用户2,用户2若是在线直接可以发送给用户,若没在线可以将消息暂存在redis或者通道里,用户2链接通道的话,两者可以直接通讯。
消息推送:用户1连接通道,根据通道id查询要推送的人是否在线,或者推送给所有人,这里我只推送给指定的人。
到此这篇关于SpringBoot+Netty+WebSocket实现消息发送的示例代码的文章就介绍到这了,更多相关SpringBootNettyWebSocket消息发送内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。