JAVA Netty实现聊天室+私聊功能的示例代码
功能介绍
使用Netty框架实现聊天室功能,服务器可监控客户端上下限状态,消息转发。同时实现了点对点私聊功能。技术点我都在代码中做了备注,这里不再重复写了。希望能给想学习netty的同学一点参考。
服务器代码
服务器入口代码
packagenio.test.netty.groupChat;
importio.netty.bootstrap.ServerBootstrap;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelOption;
importio.netty.channel.ChannelPipeline;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.SocketChannel;
importio.netty.channel.socket.nio.NioServerSocketChannel;
importio.netty.handler.codec.string.StringDecoder;
importio.netty.handler.codec.string.StringEncoder;
importio.netty.util.concurrent.Future;
importio.netty.util.concurrent.GenericFutureListener;
/**
*netty群聊服务器端
*@authorzhang
*
*/
publicclassNettyChatServer{
privateintport;
publicNettyChatServer(intport){
this.port=port;
}
//初始化netty服务器
privatevoidinit()throwsException{
EventLoopGroupboss=newNioEventLoopGroup(1);
EventLoopGroupwork=newNioEventLoopGroup(16);
try{
ServerBootstrapboot=newServerBootstrap();
boot.group(boss,work);
boot.channel(NioServerSocketChannel.class);//设置bossselector建立channel使用的对象
boot.option(ChannelOption.SO_BACKLOG,128);//boss等待连接的队列长度
boot.childOption(ChannelOption.SO_KEEPALIVE,true);//让客户端保持长期活动状态
boot.childHandler(newChannelInitializer(){
@Override
protectedvoidinitChannel(SocketChannelch)throwsException{
//从channel中获取pipeline并往里边添加Handler
ChannelPipelinepipeline=ch.pipeline();
pipeline.addLast("encoder",newStringEncoder());
pipeline.addLast("decoder",newStringDecoder());
pipeline.addLast(newServerMessageHandler());//自定义Handler来处理消息
}
});
System.out.println("服务器开始启动...");
//绑定端口
ChannelFuturechannelFuture=boot.bind(port).sync();
channelFuture.addListener(newGenericFutureListener>(){
@Override
publicvoidoperationComplete(Futurefuture)
throwsException{
if(future.isSuccess()){
System.out.println("服务器正在启动...");
}
if(future.isDone()){
System.out.println("服务器启动成功...OK");
}
}
});
//监听channel关闭
channelFuture.channel().closeFuture().sync();
channelFuture.addListener(newGenericFutureListener>(){
@Override
publicvoidoperationComplete(Futurefuture)
throwsException{
if(future.isCancelled()){
System.out.println("服务器正在关闭..");
}
if(future.isCancellable()){
System.out.println("服务器已经关闭..OK");
}
}
});
}finally{
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
/**
*启动服务器main函数
*@paramargs
*@throwsException
*/
publicstaticvoidmain(String[]args)throwsException{
newNettyChatServer(9090).init();
}
}
服务器端消息处理Handler
packagenio.test.netty.groupChat; importio.netty.channel.Channel; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.SimpleChannelInboundHandler; importio.netty.channel.group.ChannelGroup; importio.netty.channel.group.DefaultChannelGroup; importio.netty.util.concurrent.GlobalEventExecutor; importjava.text.SimpleDateFormat; importjava.util.Date; importjava.util.HashMap; importjava.util.Map; /** *自定义服务器端消息处理Handler *@authorzhang * */ publicclassServerMessageHandlerextendsSimpleChannelInboundHandler{ /** *管理全局的channel *GlobalEventExecutor.INSTANCE全局事件监听器 *一旦将channel加入ChannelGroup就不要用手动去 *管理channel的连接失效后移除操作,他会自己移除 */ privatestaticChannelGroupchannels=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** *为了实现私聊功能,这里key存储用户的唯一标识, *我保存客户端的端口号 *当然这个集合也需要自己去维护用户的上下线不能像ChannelGroup那样自己去维护 */ privatestaticMap all=newHashMap (); privateSimpleDateFormatsf=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss"); /** *处理收到的消息 */ @Override protectedvoidchannelRead0(ChannelHandlerContextctx,Stringmsg) throwsException{ Channelchannel=ctx.channel(); /** *这里简单判断如果内容里边包含#那么就是私聊 */ if(msg.contains("#")){ Stringid=msg.split("#")[0]; Stringbody=msg.split("#")[1]; ChanneluserChannel=all.get(id); Stringkey=channel.remoteAddress().toString().split(":")[1]; userChannel.writeAndFlush(sf.format(newDate())+"\n【用户】"+key+"说:"+body); return; } //判断当前消息是不是自己发送的 for(Channelc:channels){ Stringaddr=c.remoteAddress().toString(); if(channel!=c){ c.writeAndFlush(sf.format(newDate())+"\n【用户】"+addr+"说:"+msg); }else{ c.writeAndFlush(sf.format(newDate())+"\n【自己】"+addr+"说:"+msg); } } } /** *建立连接以后第一个调用的方法 */ @Override publicvoidhandlerAdded(ChannelHandlerContextctx)throwsException{ Channelchannel=ctx.channel(); Stringaddr=channel.remoteAddress().toString(); /** *这里ChannelGroup底层封装会遍历给所有的channel发送消息 * */ channels.writeAndFlush(sf.format(newDate())+"\n【用户】"+addr+"加入聊天室"); channels.add(channel); Stringkey=channel.remoteAddress().toString().split(":")[1]; all.put(key,channel); } /** *channel连接状态就绪以后调用 */ @Override publicvoidchannelActive(ChannelHandlerContextctx)throwsException{ Stringaddr=ctx.channel().remoteAddress().toString(); System.out.println(sf.format(newDate())+"\n【用户】"+addr+"上线"); } /** *channel连接状态断开后触发 */ @Override publicvoidchannelInactive(ChannelHandlerContextctx)throwsException{ Stringaddr=ctx.channel().remoteAddress().toString(); System.out.println(sf.format(newDate())+"\n【用户】"+addr+"下线"); //下线移除 Stringkey=ctx.channel().remoteAddress().toString().split(":")[1]; all.remove(key); } /** *连接发生异常时触发 */ @Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause) throwsException{ //System.out.println("连接发生异常!"); ctx.close(); } /** *断开连接会触发该消息 *同时当前channel也会自动从ChannelGroup中被移除 */ @Override publicvoidhandlerRemoved(ChannelHandlerContextctx)throwsException{ Channelchannel=ctx.channel(); Stringaddr=channel.remoteAddress().toString(); /** *这里ChannelGroup底层封装会遍历给所有的channel发送消息 * */ channels.writeAndFlush(sf.format(newDate())+"\n【用户】"+addr+"离开了"); //打印ChannelGroup中的人数 System.out.println("当前在线人数是:"+channels.size()); System.out.println("all:"+all.size()); } }
客户端主方法代码
packagenio.test.netty.groupChat;
importio.netty.bootstrap.Bootstrap;
importio.netty.channel.Channel;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelPipeline;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.nio.NioSocketChannel;
importio.netty.handler.codec.string.StringDecoder;
importio.netty.handler.codec.string.StringEncoder;
importio.netty.util.concurrent.Future;
importio.netty.util.concurrent.GenericFutureListener;
importjava.util.Scanner;
publicclassNettyChatClient{
privateStringip;
privateintport;
publicNettyChatClient(Stringip,intport){
this.ip=ip;
this.port=port;
}
/**
*初始化客户
*/
privatevoidinit()throwsException{
//创建监听事件的监听器
EventLoopGroupwork=newNioEventLoopGroup();
try{
Bootstrapboot=newBootstrap();
boot.group(work);
boot.channel(NioSocketChannel.class);
boot.handler(newChannelInitializer(){
@Override
protectedvoidinitChannel(NioSocketChannelch)
throwsException{
ChannelPipelinepipeline=ch.pipeline();
pipeline.addLast("encoder",newStringEncoder());
pipeline.addLast("decoder",newStringDecoder());
pipeline.addLast(newClientMessageHandler());
}
});
ChannelFuturechannelFuture=boot.connect(ip,port).sync();
channelFuture.addListener(newGenericFutureListener>(){
@Override
publicvoidoperationComplete(Futurefuture)
throwsException{
if(future.isSuccess()){
System.out.println("客户端启动中...");
}
if(future.isDone()){
System.out.println("客户端启动成功...OK!");
}
}
});
System.out.println(channelFuture.channel().localAddress().toString());
System.out.println("#################################################");
System.out.println("~~~~~~~~~~~~~~端口号#消息内容~~这样可以给单独一个用户发消息~~~~~~~~~~~~~~~~~~");
System.out.println("#################################################");
/**
*这里用控制台输入数据
*/
Channelchannel=channelFuture.channel();
//获取channel
Scannerscanner=newScanner(System.in);
while(scanner.hasNextLine()){
Stringstr=scanner.nextLine();
channel.writeAndFlush(str+"\n");
}
channelFuture.channel().closeFuture().sync();
scanner.close();
}finally{
work.shutdownGracefully();
}
}
/**
*主方法入口
*@paramargs
*@throwsException
*/
publicstaticvoidmain(String[]args)throwsException{
newNettyChatClient("127.0.0.1",9090).init();
}
}
客户端消息处理Handler
packagenio.test.netty.groupChat; importio.netty.channel.ChannelHandlerContext; importio.netty.channel.SimpleChannelInboundHandler; /** *客户点消息处理Handler *@authorzhang * */ publicclassClientMessageHandlerextendsSimpleChannelInboundHandler{ /** *处理收到的消息 */ @Override protectedvoidchannelRead0(ChannelHandlerContextctx,Stringmsg) throwsException{ System.out.println(msg); } /** *连接异常后触发 */ @Override publicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause) throwsException{ ctx.close(); } }
测试结果
启动了四个客户端服务器端日志效果如下:
客户端一端日志:
客户端二日志:
客户端三日志:
客户端四日志:
现在在客户端四发送消息:
每个客户端都可以收到消息:
软化关闭客户端客户端三:
服务器日志:
其他客户端日志:
发送私聊消息:
这个客户端收不到消息
到此这篇关于JAVANetty实现聊天室+私聊功能的示例代码的文章就介绍到这了,更多相关JAVANetty聊天室内容请搜索毛票票以前的文章或继续浏览下面的相关文章希望大家以后多多支持毛票票!
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。