基于BIO的Java Socket通信详解
BIO,即阻塞IO,在基于Socket的消息通信过程中,Socket服务端向外部提供服务,而Socket客户端可以建立到Socket服务端的连接,进而发送请求数据,然后等待Socket服务端处理,并返回处理结果(响应)。
基于BIO的通信,Socket服务端会发生阻塞,即在监听过程中每次accept到一个客户端的Socket连接,就要处理这个请求,而此时其他连接过来的客户端只能阻塞等待。可见,这种模式下Socket服务端的处理能力是非常有限的,客户端也只能等待,直到服务端空闲时进行请求的处理。
BIO通信实现
下面基于BIO模式,来实现一个简单的Socket服务端与Socket客户端进行通信的逻辑,对这种通信方式有一个感性的认识。具体逻辑描述如下:
1、Socket客户端连接到Socket服务端,并发送数据“IamtheclientN.”;
2、Socket服务端,监听服务端口,并接收客户端请求数据,如果请求数据以“Iamtheclient”开头,则响应客户端“Iamtheserver,andyouaretheNthclient.”;
Socket服务端实现,代码如下所示:
packageorg.shirdrn.java.communications.bio; importjava.io.IOException; importjava.io.InputStream; importjava.io.OutputStream; importjava.net.ServerSocket; importjava.net.Socket; /** *基于BIO的Socket服务器端 * *@authorshirdrn */ publicclassSimpleBioTcpServerextendsThread{ /**服务端口号*/ privateintport=8888; /**为客户端分配编号*/ privatestaticintsequence=0; publicSimpleBioTcpServer(intport){ this.port=port; } @Override publicvoidrun(){ Socketsocket=null; try{ ServerSocketserverSocket=newServerSocket(this.port); while(true){ socket=serverSocket.accept();//监听 this.handleMessage(socket);//处理一个连接过来的客户端请求 } }catch(IOExceptione){ e.printStackTrace(); } } /** *处理一个客户端socket连接 *@paramsocket客户端socket *@throwsIOException */ privatevoidhandleMessage(Socketsocket)throwsIOException{ InputStreamin=socket.getInputStream();//流:客户端->服务端(读) OutputStreamout=socket.getOutputStream();//流:服务端->客户端(写) intreceiveBytes; byte[]receiveBuffer=newbyte[128]; StringclientMessage=""; if((receiveBytes=in.read(receiveBuffer))!=-1){ clientMessage=newString(receiveBuffer,0,receiveBytes); if(clientMessage.startsWith("Iamtheclient")){ StringserverResponseWords= "Iamtheserver,andyouarethe"+(++sequence)+"thclient."; out.write(serverResponseWords.getBytes()); } } out.flush(); System.out.println("Server:receivesclientMessage->"+clientMessage); } publicstaticvoidmain(String[]args){ SimpleBioTcpServerserver=newSimpleBioTcpServer(1983); server.start(); } }
上述实现,没有进行复杂的异常处理。
Socket客户端实现,代码如下所示:
packageorg.shirdrn.java.communications.bio; importjava.io.IOException; importjava.io.InputStream; importjava.io.OutputStream; importjava.net.Socket; importjava.net.UnknownHostException; importjava.util.Date; /** *基于BIO的Socket客户端 * *@authorshirdrn */ publicclassSimpleBioTcpClient{ privateStringipAddress; privateintport; privatestaticintpos=0; publicSimpleBioTcpClient(){} publicSimpleBioTcpClient(StringipAddress,intport){ this.ipAddress=ipAddress; this.port=port; } /** *连接Socket服务端,并模拟发送请求数据 *@paramdata请求数据 */ publicvoidsend(byte[]data){ Socketsocket=null; OutputStreamout=null; InputStreamin=null; try{ socket=newSocket(this.ipAddress,this.port);//连接 //发送请求 out=socket.getOutputStream(); out.write(data); out.flush(); //接收响应 in=socket.getInputStream(); inttotalBytes=0; intreceiveBytes=0; byte[]receiveBuffer=newbyte[128]; if((receiveBytes=in.read(receiveBuffer))!=-1){ totalBytes+=receiveBytes; } StringserverMessage=newString(receiveBuffer,0,receiveBytes); System.out.println("Client:receivesserverMessage->"+serverMessage); }catch(UnknownHostExceptione){ e.printStackTrace(); }catch(IOExceptione){ e.printStackTrace(); }catch(Exceptione){ e.printStackTrace(); }finally{ try{ //发送请求并接收到响应,通信完成,关闭连接 out.close(); in.close(); socket.close(); }catch(IOExceptione){ e.printStackTrace(); } } } publicstaticvoidmain(String[]args){ intn=1; StringBufferdata=newStringBuffer(); Datestart=newDate(); for(inti=0;i首先启动Socket服务端进程SimpleBioTcpServer,然后再运行Socket客户端SimpleBioTcpClient。可以看到,服务端接收到请求数据,然后响应客户端,客户端接收到了服务端的响应数据。
上述实现中,对于Socket客户端和服务端都是一次写入,并一次读出,而在实际中如果每次通信过程中数据量特别大的话,服务器端是不可能接受的,可以在确定客户端请求数据字节数的情况,循环来读取并进行处理。
另外,对于上述实现中流没有进行装饰(Wrapped)处理,在实际中会有性能的损失,如不能缓冲等。
对于Socket服务端接收数据,如果可以使多次循环读取到的字节数据通过一个可变长的字节缓冲区来存储,就能方便多了,可是使用ByteArrayOutputStream,例如:
ByteArrayOutputStreamdata=newByteArrayOutputStream(); data.write(receiveBuffer,totalBytes,totalBytes+receiveBytes);BIO通信测试
下面测试一下大量请求的场景下,Socket服务端处理的效率。
第一种方式:通过for循环来启动5000个Socket客户端,发送请求,代码如下所示:
publicstaticvoidmain(String[]args){ intn=5000; StringBufferdata=newStringBuffer(); Datestart=newDate(); for(inti=0;i经过测试,大约需要9864ms,大概接近10s。
第二种方式:通过启动5000个独立的客户端线程,同时请求,服务端进行计数:
packageorg.shirdrn.java.communications.bio; importjava.io.IOException; importjava.io.InputStream; importjava.io.OutputStream; importjava.net.ServerSocket; importjava.net.Socket; importjava.net.UnknownHostException; importjava.util.Date; /** *基于BIO的Socket通信测试 * *@authorshirdrn */ publicclassSimpleBioTcpTest{ staticintthreadCount=5000; /** *基于BIO的Socket服务端进程 * *@authorshirdrn */ staticclassSocketServerextendsThread{ /**服务端口号*/ privateintport=8888; /**为客户端分配编号*/ privatestaticintsequence=0; publicSocketServer(intport){ this.port=port; } @Override publicvoidrun(){ Socketsocket=null; intcounter=0; try{ ServerSocketserverSocket=newServerSocket(this.port); booleanflag=false; Datestart=null; while(true){ socket=serverSocket.accept();//监听 //有请求到来才开始计时 if(!flag){ start=newDate(); flag=true; } this.handleMessage(socket);//处理一个连接过来的客户端请求 if(++counter==threadCount){ Dateend=newDate(); longlast=end.getTime()-start.getTime(); System.out.println(threadCount+"requestscost"+last+"ms."); } } }catch(IOExceptione){ e.printStackTrace(); } } /** *处理一个客户端socket连接 *@paramsocket客户端socket *@throwsIOException */ privatevoidhandleMessage(Socketsocket)throwsIOException{ InputStreamin=socket.getInputStream();//流:客户端->服务端(读) OutputStreamout=socket.getOutputStream();//流:服务端->客户端(写) intreceiveBytes; byte[]receiveBuffer=newbyte[128]; StringclientMessage=""; if((receiveBytes=in.read(receiveBuffer))!=-1){ clientMessage=newString(receiveBuffer,0,receiveBytes); if(clientMessage.startsWith("Iamtheclient")){ StringserverResponseWords= "Iamtheserver,andyouarethe"+(++sequence)+"thclient."; out.write(serverResponseWords.getBytes()); } } out.flush(); System.out.println("Server:receivesclientMessage->"+clientMessage); } } /** *基于BIO的Socket客户端线程 * *@authorshirdrn */ staticclassSocketClientimplementsRunnable{ privateStringipAddress; privateintport; /**待发送的请求数据*/ privateStringdata; publicSocketClient(StringipAddress,intport){ this.ipAddress=ipAddress; this.port=port; } @Override publicvoidrun(){ this.send(); } /** *连接Socket服务端,并模拟发送请求数据 */ publicvoidsend(){ Socketsocket=null; OutputStreamout=null; InputStreamin=null; try{ socket=newSocket(this.ipAddress,this.port);//连接 //发送请求 out=socket.getOutputStream(); out.write(data.getBytes()); out.flush(); //接收响应 in=socket.getInputStream(); inttotalBytes=0; intreceiveBytes=0; byte[]receiveBuffer=newbyte[128]; if((receiveBytes=in.read(receiveBuffer))!=-1){ totalBytes+=receiveBytes; } StringserverMessage=newString(receiveBuffer,0,receiveBytes); System.out.println("Client:receivesserverMessage->"+serverMessage); }catch(UnknownHostExceptione){ e.printStackTrace(); }catch(IOExceptione){ e.printStackTrace(); }catch(Exceptione){ e.printStackTrace(); }finally{ try{ //发送请求并接收到响应,通信完成,关闭连接 out.close(); in.close(); socket.close(); }catch(IOExceptione){ e.printStackTrace(); } } } publicvoidsetData(Stringdata){ this.data=data; } } publicstaticvoidmain(String[]args)throwsException{ SocketServerserver=newSocketServer(1983); server.start(); Thread.sleep(3000); for(inti=0;i经过测试,大约需要7110ms,大概接近7s,没有太大提高。
BIO通信改进
通过上面的测试我们可以发现,在Socket服务端对来自客户端的请求进行处理时,会发生阻塞,严重地影响了能够并发处理请求的效率。实际上,在Socket服务端接收来自客户端连接能力的范围内,可以将接收请求独立出来,从而在将处理请求独立粗话来,通过一个请求一个线程处理的方式来解决上述问题。这样,服务端是多处理线程对应客户端多请求,处理效率有一定程度的提高。
下面,通过单线程接收请求,然后委派线程池进行多线程并发处理请求:
/** *基于BIO的Socket服务端进程 * *@authorshirdrn */ staticclassSocketServerextendsThread{ /**服务端口号*/ privateintport=8888; /**为客户端分配编号*/ privatestaticintsequence=0; /**处理客户端请求的线程池*/ privateExecutorServicepool; publicSocketServer(intport,intpoolSize){ this.port=port; this.pool=Executors.newFixedThreadPool(poolSize); } @Override publicvoidrun(){ Socketsocket=null; intcounter=0; try{ ServerSocketserverSocket=newServerSocket(this.port); booleanflag=false; Datestart=null; while(true){ socket=serverSocket.accept();//监听 //有请求到来才开始计时 if(!flag){ start=newDate(); flag=true; } //将客户端请求放入线程池处理 pool.execute(newRequestHandler(socket)); if(++counter==threadCount){ Dateend=newDate(); longlast=end.getTime()-start.getTime(); System.out.println(threadCount+"requestscost"+last+"ms."); } } }catch(IOExceptione){ e.printStackTrace(); } } /** *客户端请求处理线程类 * *@authorshirdrn */ classRequestHandlerimplementsRunnable{ privateSocketsocket; publicRequestHandler(Socketsocket){ this.socket=socket; } @Override publicvoidrun(){ try{ InputStreamin=socket.getInputStream();//流:客户端->服务端(读) OutputStreamout=socket.getOutputStream();//流:服务端->客户端(写) intreceiveBytes; byte[]receiveBuffer=newbyte[128]; StringclientMessage=""; if((receiveBytes=in.read(receiveBuffer))!=-1){ clientMessage=newString(receiveBuffer,0,receiveBytes); if(clientMessage.startsWith("Iamtheclient")){ StringserverResponseWords= "Iamtheserver,andyouarethe"+(++sequence)+"thclient."; out.write(serverResponseWords.getBytes()); } } out.flush(); System.out.println("Server:receivesclientMessage->"+clientMessage); }catch(IOExceptione){ e.printStackTrace(); } } } }可见,这种改进方式增强服务端处理请求的并发度,但是每一个请求都要由一个线程去处理,大量请求造成服务端启动大量进程进行处理,也是比较占用服务端资源的。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。