Java使用NIO包实现Socket通信的实例代码
前面几篇文章介绍了使用java.io和java.net类库实现的Socket通信,下面介绍一下使用java.nio类库实现的Socket。
java.nio包是Java在1.4之后增加的,用来提高I/O操作的效率。在nio包中主要包括以下几个类或接口:
- Buffer:缓冲区,用来临时存放输入或输出数据。
- Charset:用来把Unicode字符编码和其它字符编码互转。
- Channel:数据传输通道,用来把Buffer中的数据写入到数据源,或者把数据源中的数据读入到Buffer。
- Selector:用来支持异步I/O操作,也叫非阻塞I/O操作。
nio包中主要通过下面两个方面来提高I/O操作效率:
- 通过Buffer和Channel来提高I/O操作的速度。
- 通过Selector来支持非阻塞I/O操作。
下面来看一下程序中是怎么通过这些类库实现Socket功能。
首先介绍一下几个辅助类
辅助类SerializableUtil,这个类用来把java对象序列化成字节数组,或者把字节数组反序列化成java对象。
packagecom.googlecode.garbagecan.test.socket;
importjava.io.ByteArrayInputStream;
importjava.io.ByteArrayOutputStream;
importjava.io.IOException;
importjava.io.ObjectInputStream;
importjava.io.ObjectOutputStream;
publicclassSerializableUtil{
publicstaticbyte[]toBytes(Objectobject){
ByteArrayOutputStreambaos=newByteArrayOutputStream();
ObjectOutputStreamoos=null;
try{
oos=newObjectOutputStream(baos);
oos.writeObject(object);
byte[]bytes=baos.toByteArray();
returnbytes;
}catch(IOExceptionex){
thrownewRuntimeException(ex.getMessage(),ex);
}finally{
try{
oos.close();
}catch(Exceptione){}
}
}
publicstaticObjecttoObject(byte[]bytes){
ByteArrayInputStreambais=newByteArrayInputStream(bytes);
ObjectInputStreamois=null;
try{
ois=newObjectInputStream(bais);
Objectobject=ois.readObject();
returnobject;
}catch(IOExceptionex){
thrownewRuntimeException(ex.getMessage(),ex);
}catch(ClassNotFoundExceptionex){
thrownewRuntimeException(ex.getMessage(),ex);
}finally{
try{
ois.close();
}catch(Exceptione){}
}
}
}
辅助类MyRequestObject和MyResponseObject,这两个类是普通的java对象,实现了Serializable接口。MyRequestObject类是Client发出的请求,MyResponseObject是Server端作出的响应。
packagecom.googlecode.garbagecan.test.socket.nio;
importjava.io.Serializable;
publicclassMyRequestObjectimplementsSerializable{
privatestaticfinallongserialVersionUID=1L;
privateStringname;
privateStringvalue;
privatebyte[]bytes;
publicMyRequestObject(Stringname,Stringvalue){
this.name=name;
this.value=value;
this.bytes=newbyte[1024];
}
publicStringgetName(){
returnname;
}
publicvoidsetName(Stringname){
this.name=name;
}
publicStringgetValue(){
returnvalue;
}
publicvoidsetValue(Stringvalue){
this.value=value;
}
@Override
publicStringtoString(){
StringBuffersb=newStringBuffer();
sb.append("Request[name:"+name+",value:"+value+",bytes:"+bytes.length+"]");
returnsb.toString();
}
}
packagecom.googlecode.garbagecan.test.socket.nio;
importjava.io.Serializable;
publicclassMyResponseObjectimplementsSerializable{
privatestaticfinallongserialVersionUID=1L;
privateStringname;
privateStringvalue;
privatebyte[]bytes;
publicMyResponseObject(Stringname,Stringvalue){
this.name=name;
this.value=value;
this.bytes=newbyte[1024];
}
publicStringgetName(){
returnname;
}
publicvoidsetName(Stringname){
this.name=name;
}
publicStringgetValue(){
returnvalue;
}
publicvoidsetValue(Stringvalue){
this.value=value;
}
@Override
publicStringtoString(){
StringBuffersb=newStringBuffer();
sb.append("Response[name:"+name+",value:"+value+",bytes:"+bytes.length+"]");
returnsb.toString();
}
}
下面主要看一下Server端的代码,其中有一些英文注释对理解代码很有帮助,注释主要是来源jdk的文档和例子,这里就没有再翻译
packagecom.googlecode.garbagecan.test.socket.nio;
importjava.io.ByteArrayOutputStream;
importjava.io.IOException;
importjava.net.InetSocketAddress;
importjava.nio.ByteBuffer;
importjava.nio.channels.ClosedChannelException;
importjava.nio.channels.SelectionKey;
importjava.nio.channels.Selector;
importjava.nio.channels.ServerSocketChannel;
importjava.nio.channels.SocketChannel;
importjava.util.Iterator;
importjava.util.logging.Level;
importjava.util.logging.Logger;
importcom.googlecode.garbagecan.test.socket.SerializableUtil;
publicclassMyServer3{
privatefinalstaticLoggerlogger=Logger.getLogger(MyServer3.class.getName());
publicstaticvoidmain(String[]args){
Selectorselector=null;
ServerSocketChannelserverSocketChannel=null;
try{
//Selectorforincomingtimerequests
selector=Selector.open();
//Createanewserversocketandsettononblockingmode
serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//Bindtheserversockettothelocalhostandport
serverSocketChannel.socket().setReuseAddress(true);
serverSocketChannel.socket().bind(newInetSocketAddress(10000));
//Registeracceptsontheserversocketwiththeselector.This
//steptellstheselectorthatthesocketwantstobeputonthe
//readylistwhenacceptoperationsoccur,soallowingmultiplexed
//non-blockingI/Ototakeplace.
serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
//Here'swhereeverythinghappens.Theselectmethodwill
//returnwhenanyoperationsregisteredabovehaveoccurred,the
//threadhasbeeninterrupted,etc.
while(selector.select()>0){
//SomeoneisreadyforI/O,getthereadykeys
Iterator<SelectionKey>it=selector.selectedKeys().iterator();
//Walkthroughthereadykeyscollectionandprocessdaterequests.
while(it.hasNext()){
SelectionKeyreadyKey=it.next();
it.remove();
//Thekeyindexesintotheselectorsoyou
//canretrievethesocketthat'sreadyforI/O
execute((ServerSocketChannel)readyKey.channel());
}
}
}catch(ClosedChannelExceptionex){
logger.log(Level.SEVERE,null,ex);
}catch(IOExceptionex){
logger.log(Level.SEVERE,null,ex);
}finally{
try{
selector.close();
}catch(Exceptionex){}
try{
serverSocketChannel.close();
}catch(Exceptionex){}
}
}
privatestaticvoidexecute(ServerSocketChannelserverSocketChannel)throwsIOException{
SocketChannelsocketChannel=null;
try{
socketChannel=serverSocketChannel.accept();
MyRequestObjectmyRequestObject=receiveData(socketChannel);
logger.log(Level.INFO,myRequestObject.toString());
MyResponseObjectmyResponseObject=newMyResponseObject(
"responsefor"+myRequestObject.getName(),
"responsefor"+myRequestObject.getValue());
sendData(socketChannel,myResponseObject);
logger.log(Level.INFO,myResponseObject.toString());
}finally{
try{
socketChannel.close();
}catch(Exceptionex){}
}
}
privatestaticMyRequestObjectreceiveData(SocketChannelsocketChannel)throwsIOException{
MyRequestObjectmyRequestObject=null;
ByteArrayOutputStreambaos=newByteArrayOutputStream();
ByteBufferbuffer=ByteBuffer.allocate(1024);
try{
byte[]bytes;
intsize=0;
while((size=socketChannel.read(buffer))>=0){
buffer.flip();
bytes=newbyte[size];
buffer.get(bytes);
baos.write(bytes);
buffer.clear();
}
bytes=baos.toByteArray();
Objectobj=SerializableUtil.toObject(bytes);
myRequestObject=(MyRequestObject)obj;
}finally{
try{
baos.close();
}catch(Exceptionex){}
}
returnmyRequestObject;
}
privatestaticvoidsendData(SocketChannelsocketChannel,MyResponseObjectmyResponseObject)throwsIOException{
byte[]bytes=SerializableUtil.toBytes(myResponseObject);
ByteBufferbuffer=ByteBuffer.wrap(bytes);
socketChannel.write(buffer);
}
}
下面是Client的代码,代码比较简单就是启动了100个线程来访问Server
packagecom.googlecode.garbagecan.test.socket.nio;
importjava.io.ByteArrayOutputStream;
importjava.io.IOException;
importjava.net.InetSocketAddress;
importjava.net.SocketAddress;
importjava.nio.ByteBuffer;
importjava.nio.channels.SocketChannel;
importjava.util.logging.Level;
importjava.util.logging.Logger;
importcom.googlecode.garbagecan.test.socket.SerializableUtil;
publicclassMyClient3{
privatefinalstaticLoggerlogger=Logger.getLogger(MyClient3.class.getName());
publicstaticvoidmain(String[]args)throwsException{
for(inti=0;i<100;i++){
finalintidx=i;
newThread(newMyRunnable(idx)).start();
}
}
privatestaticfinalclassMyRunnableimplementsRunnable{
privatefinalintidx;
privateMyRunnable(intidx){
this.idx=idx;
}
publicvoidrun(){
SocketChannelsocketChannel=null;
try{
socketChannel=SocketChannel.open();
SocketAddresssocketAddress=newInetSocketAddress("localhost",10000);
socketChannel.connect(socketAddress);
MyRequestObjectmyRequestObject=newMyRequestObject("request_"+idx,"request_"+idx);
logger.log(Level.INFO,myRequestObject.toString());
sendData(socketChannel,myRequestObject);
MyResponseObjectmyResponseObject=receiveData(socketChannel);
logger.log(Level.INFO,myResponseObject.toString());
}catch(Exceptionex){
logger.log(Level.SEVERE,null,ex);
}finally{
try{
socketChannel.close();
}catch(Exceptionex){}
}
}
privatevoidsendData(SocketChannelsocketChannel,MyRequestObjectmyRequestObject)throwsIOException{
byte[]bytes=SerializableUtil.toBytes(myRequestObject);
ByteBufferbuffer=ByteBuffer.wrap(bytes);
socketChannel.write(buffer);
socketChannel.socket().shutdownOutput();
}
privateMyResponseObjectreceiveData(SocketChannelsocketChannel)throwsIOException{
MyResponseObjectmyResponseObject=null;
ByteArrayOutputStreambaos=newByteArrayOutputStream();
try{
ByteBufferbuffer=ByteBuffer.allocateDirect(1024);
byte[]bytes;
intcount=0;
while((count=socketChannel.read(buffer))>=0){
buffer.flip();
bytes=newbyte[count];
buffer.get(bytes);
baos.write(bytes);
buffer.clear();
}
bytes=baos.toByteArray();
Objectobj=SerializableUtil.toObject(bytes);
myResponseObject=(MyResponseObject)obj;
socketChannel.socket().shutdownInput();
}finally{
try{
baos.close();
}catch(Exceptionex){}
}
returnmyResponseObject;
}
}
}
最后测试上面的代码,首先运行Server类,然后运行Client类,就可以分别在Server端和Client端控制台看到发送或接收到的MyRequestObject或MyResponseObject对象了。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。