Java Socket编程实例(三)- TCP服务端线程池
一、服务端回传服务类:
importjava.io.IOException;
importjava.io.InputStream;
importjava.io.OutputStream;
importjava.net.Socket;
importjava.util.logging.Level;
importjava.util.logging.Logger;
publicclassEchoProtocolimplementsRunnable{
privatestaticfinalintBUFSIZE=32;//Size(inbytes)ofI/Obuffer
privateSocketclientSocket;//Socketconnecttoclient
privateLoggerlogger;//Serverlogger
publicEchoProtocol(SocketclientSocket,Loggerlogger){
this.clientSocket=clientSocket;
this.logger=logger;
}
publicstaticvoidhandleEchoClient(SocketclientSocket,Loggerlogger){
try{
//GettheinputandoutputI/Ostreamsfromsocket
InputStreamin=clientSocket.getInputStream();
OutputStreamout=clientSocket.getOutputStream();
intrecvMsgSize;//Sizeofreceivedmessage
inttotalBytesEchoed=0;//Bytesreceivedfromclient
byte[]echoBuffer=newbyte[BUFSIZE];//ReceiveBuffer
//Receiveuntilclientclosesconnection,indicatedby-1
while((recvMsgSize=in.read(echoBuffer))!=-1){
out.write(echoBuffer,0,recvMsgSize);
totalBytesEchoed+=recvMsgSize;
}
logger.info("Client"+clientSocket.getRemoteSocketAddress()+",echoed"+totalBytesEchoed+"bytes.");
}catch(IOExceptionex){
logger.log(Level.WARNING,"Exceptioninechoprotocol",ex);
}finally{
try{
clientSocket.close();
}catch(IOExceptione){
}
}
}
publicvoidrun(){
handleEchoClient(this.clientSocket,this.logger);
}
}
二、每个客户端请求都新启一个线程的Tcp服务端:
importjava.io.IOException;
importjava.net.ServerSocket;
importjava.net.Socket;
importjava.util.logging.Logger;
publicclassTCPEchoServerThread{
publicstaticvoidmain(String[]args)throwsIOException{
//Createaserversockettoacceptclientconnectionrequests
ServerSocketservSock=newServerSocket(5500);
Loggerlogger=Logger.getLogger("practical");
//Runforever,acceptingandspawningathreadforeachconnection
while(true){
SocketclntSock=servSock.accept();//Blockwaitingforconnection
//Spawnthreadtohandlenewconnection
Threadthread=newThread(newEchoProtocol(clntSock,logger));
thread.start();
logger.info("CreatedandstartedThread"+thread.getName());
}
/*NOTREACHED*/
}
}
三、固定线程数的Tcp服务端:
importjava.io.IOException;
importjava.net.ServerSocket;
importjava.net.Socket;
importjava.util.logging.Level;
importjava.util.logging.Logger;
publicclassTCPEchoServerPool{
publicstaticvoidmain(String[]args)throwsIOException{
intthreadPoolSize=3;//FixedThreadPoolSize
finalServerSocketservSock=newServerSocket(5500);
finalLoggerlogger=Logger.getLogger("practical");
//Spawnafixednumberofthreadstoserviceclients
for(inti=0;i<threadPoolSize;i++){
Threadthread=newThread(){
publicvoidrun(){
while(true){
try{
SocketclntSock=servSock.accept();//Waitforaconnection
EchoProtocol.handleEchoClient(clntSock,logger);//Handleit
}catch(IOExceptionex){
logger.log(Level.WARNING,"Clientacceptfailed",ex);
}
}
}
};
thread.start();
logger.info("CreatedandstartedThread="+thread.getName());
}
}
}
四、使用线程池(使用Spring的线程次会有队列、最大线程数、最小线程数和超时时间的概念)
1.线程池工具类:
importjava.util.concurrent.*;
/**
*任务执行者
*
*@authorWatsonXu
*@since1.0.0<p>2013-6-8上午10:33:09</p>
*/
publicclassThreadPoolTaskExecutor{
privateThreadPoolTaskExecutor(){
}
privatestaticExecutorServiceexecutor=Executors.newCachedThreadPool(newThreadFactory(){
intcount;
/*执行器会在需要自行任务而线程池中没有线程的时候来调用该程序。对于callable类型的调用通过封装以后转化为runnable*/
publicThreadnewThread(Runnabler){
count++;
ThreadinvokeThread=newThread(r);
invokeThread.setName("CourserThread-"+count);
invokeThread.setDaemon(false);////????????????
returninvokeThread;
}
});
publicstaticvoidinvoke(Runnabletask,TimeUnitunit,longtimeout)throwsTimeoutException,RuntimeException{
invoke(task,null,unit,timeout);
}
publicstatic<T>Tinvoke(Runnabletask,Tresult,TimeUnitunit,longtimeout)throwsTimeoutException,
RuntimeException{
Future<T>future=executor.submit(task,result);
Tt=null;
try{
t=future.get(timeout,unit);
}catch(TimeoutExceptione){
thrownewTimeoutException("Threadinvoketimeout...");
}catch(Exceptione){
thrownewRuntimeException(e);
}
returnt;
}
publicstatic<T>Tinvoke(Callable<T>task,TimeUnitunit,longtimeout)throwsTimeoutException,RuntimeException{
//这里将任务提交给执行器,任务已经启动,这里是异步的。
Future<T>future=executor.submit(task);
//System.out.println("Taskareadyinthread");
Tt=null;
try{
/*
*这里的操作是确认任务是否已经完成,有了这个操作以后
*1)对invoke()的调用线程变成了等待任务完成状态
*2)主线程可以接收子线程的处理结果
*/
t=future.get(timeout,unit);
}catch(TimeoutExceptione){
thrownewTimeoutException("Threadinvoketimeout...");
}catch(Exceptione){
thrownewRuntimeException(e);
}
returnt;
}
}
2.具有伸缩性的Tcp服务端:
importjava.io.IOException;
importjava.net.ServerSocket;
importjava.net.Socket;
importjava.util.concurrent.TimeUnit;
importjava.util.logging.Logger;
importdemo.callable.ThreadPoolTaskExecutor;
publicclassTCPEchoServerExecutor{
publicstaticvoidmain(String[]args)throwsIOException{
//Createaserversockettoacceptclientconnectionrequests
ServerSocketservSock=newServerSocket(5500);
Loggerlogger=Logger.getLogger("practical");
//Runforever,acceptingandspawningthreadstoserviceeachconnection
while(true){
SocketclntSock=servSock.accept();//Blockwaitingforconnection
//executorService.submit(newEchoProtocol(clntSock,logger));
try{
ThreadPoolTaskExecutor.invoke(newEchoProtocol(clntSock,logger),TimeUnit.SECONDS,3);
}catch(Exceptione){
}
//service.execute(newTimelimitEchoProtocol(clntSock,logger));
}
/*NOTREACHED*/
}
}
以上就是本文的全部内容,查看更多Java的语法,大家可以关注:《ThinkinginJava中文手册》、《JDK1.7参考手册官方英文版》、《JDK1.6APIjava中文参考手册》、《JDK1.5APIjava中文参考手册》,也希望大家多多支持毛票票。