详解PipedInputStream和PipedOutputStream_动力节点Java学院整理
java管道介绍
在java中,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。
它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。
使用管道通信时,大致的流程是:我们在线程A中向PipedOutputStream中写入数据,这些数据会自动的发送到与PipedOutputStream对应的PipedInputStream中,进而存储在PipedInputStream的缓冲中;此时,线程B通过读取PipedInputStream中的数据。就可以实现,线程A和线程B的通信。
PipedOutputStream和PipedInputStream源码分析
下面介绍PipedOutputStream和PipedInputStream的源码。在阅读它们的源码之前,建议先看看源码后面的示例。待理解管道的作用和用法之后,再看源码,可能更容易理解。
1.PipedOutputStream源码分析(基于jdk1.7.40)
packagejava.io; importjava.io.*; publicclassPipedOutputStreamextendsOutputStream{ //与PipedOutputStream通信的PipedInputStream对象 privatePipedInputStreamsink; //构造函数,指定配对的PipedInputStream publicPipedOutputStream(PipedInputStreamsnk)throwsIOException{ connect(snk); } //构造函数 publicPipedOutputStream(){ } //将“管道输出流”和“管道输入流”连接。 publicsynchronizedvoidconnect(PipedInputStreamsnk)throwsIOException{ if(snk==null){ thrownewNullPointerException(); }elseif(sink!=null||snk.connected){ thrownewIOException("Alreadyconnected"); } //设置“管道输入流” sink=snk; //初始化“管道输入流”的读写位置 //int是PipedInputStream中定义的,代表“管道输入流”的读写位置 snk.in=-1; //初始化“管道输出流”的读写位置。 //out是PipedInputStream中定义的,代表“管道输出流”的读写位置 snk.out=0; //设置“管道输入流”和“管道输出流”为已连接状态 //connected是PipedInputStream中定义的,用于表示“管道输入流与管道输出流”是否已经连接 snk.connected=true; } //将int类型b写入“管道输出流”中。 //将b写入“管道输出流”之后,它会将b传输给“管道输入流” publicvoidwrite(intb)throwsIOException{ if(sink==null){ thrownewIOException("Pipenotconnected"); } sink.receive(b); } //将字节数组b写入“管道输出流”中。 //将数组b写入“管道输出流”之后,它会将其传输给“管道输入流” publicvoidwrite(byteb[],intoff,intlen)throwsIOException{ if(sink==null){ thrownewIOException("Pipenotconnected"); }elseif(b==null){ thrownewNullPointerException(); }elseif((off<0)||(off>b.length)||(len<0)|| ((off+len)>b.length)||((off+len)<0)){ thrownewIndexOutOfBoundsException(); }elseif(len==0){ return; } //“管道输入流”接收数据 sink.receive(b,off,len); } //清空“管道输出流”。 //这里会调用“管道输入流”的notifyAll(); //目的是让“管道输入流”放弃对当前资源的占有,让其它的等待线程(等待读取管道输出流的线程)读取“管道输出流”的值。 publicsynchronizedvoidflush()throwsIOException{ if(sink!=null){ synchronized(sink){ sink.notifyAll(); } } } //关闭“管道输出流”。 //关闭之后,会调用receivedLast()通知“管道输入流”它已经关闭。 publicvoidclose()throwsIOException{ if(sink!=null){ sink.receivedLast(); } } }
2.PipedInputStream源码分析(基于jdk1.7.40)
packagejava.io; publicclassPipedInputStreamextendsInputStream{ //“管道输出流”是否关闭的标记 booleanclosedByWriter=false; //“管道输入流”是否关闭的标记 volatilebooleanclosedByReader=false; //“管道输入流”与“管道输出流”是否连接的标记 //它在PipedOutputStream的connect()连接函数中被设置为true booleanconnected=false; ThreadreadSide;//读取“管道”数据的线程 ThreadwriteSide;//向“管道”写入数据的线程 //“管道”的默认大小 privatestaticfinalintDEFAULT_PIPE_SIZE=1024; protectedstaticfinalintPIPE_SIZE=DEFAULT_PIPE_SIZE; //缓冲区 protectedbytebuffer[]; //下一个写入字节的位置。in==out代表满,说明“写入的数据”全部被读取了。 protectedintin=-1; //下一个读取字节的位置。in==out代表满,说明“写入的数据”全部被读取了。 protectedintout=0; //构造函数:指定与“管道输入流”关联的“管道输出流” publicPipedInputStream(PipedOutputStreamsrc)throwsIOException{ this(src,DEFAULT_PIPE_SIZE); } //构造函数:指定与“管道输入流”关联的“管道输出流”,以及“缓冲区大小” publicPipedInputStream(PipedOutputStreamsrc,intpipeSize) throwsIOException{ initPipe(pipeSize); connect(src); } //构造函数:默认缓冲区大小是1024字节 publicPipedInputStream(){ initPipe(DEFAULT_PIPE_SIZE); } //构造函数:指定缓冲区大小是pipeSize publicPipedInputStream(intpipeSize){ initPipe(pipeSize); } //初始化“管道”:新建缓冲区大小 privatevoidinitPipe(intpipeSize){ if(pipeSize<=0){ thrownewIllegalArgumentException("PipeSize<=0"); } buffer=newbyte[pipeSize]; } //将“管道输入流”和“管道输出流”绑定。 //实际上,这里调用的是PipedOutputStream的connect()函数 publicvoidconnect(PipedOutputStreamsrc)throwsIOException{ src.connect(this); } //接收int类型的数据b。 //它只会在PipedOutputStream的write(intb)中会被调用 protectedsynchronizedvoidreceive(intb)throwsIOException{ //检查管道状态 checkStateForReceive(); //获取“写入管道”的线程 writeSide=Thread.currentThread(); //若“写入管道”的数据正好全部被读取完,则等待。 if(in==out) awaitSpace(); if(in<0){ in=0; out=0; } //将b保存到缓冲区 buffer[in++]=(byte)(b&0xFF); if(in>=buffer.length){ in=0; } } //接收字节数组b。 synchronizedvoidreceive(byteb[],intoff,intlen)throwsIOException{ //检查管道状态 checkStateForReceive(); //获取“写入管道”的线程 writeSide=Thread.currentThread(); intbytesToTransfer=len; while(bytesToTransfer>0){ //若“写入管道”的数据正好全部被读取完,则等待。 if(in==out) awaitSpace(); intnextTransferAmount=0; //如果“管道中被读取的数据,少于写入管道的数据”; //则设置nextTransferAmount=“buffer.length-in” if(outbytesToTransfer) nextTransferAmount=bytesToTransfer; //assert断言的作用是,若nextTransferAmount<=0,则终止程序。 assert(nextTransferAmount>0); //将数据写入到缓冲中 System.arraycopy(b,off,buffer,in,nextTransferAmount); bytesToTransfer-=nextTransferAmount; off+=nextTransferAmount; in+=nextTransferAmount; if(in>=buffer.length){ in=0; } } } //检查管道状态 privatevoidcheckStateForReceive()throwsIOException{ if(!connected){ thrownewIOException("Pipenotconnected"); }elseif(closedByWriter||closedByReader){ thrownewIOException("Pipeclosed"); }elseif(readSide!=null&&!readSide.isAlive()){ thrownewIOException("Readenddead"); } } //等待。 //若“写入管道”的数据正好全部被读取完(例如,管道缓冲满),则执行awaitSpace()操作; //它的目的是让“读取管道的线程”管道产生读取数据请求,从而才能继续的向“管道”中写入数据。 privatevoidawaitSpace()throwsIOException{ //如果“管道中被读取的数据,等于写入管道的数据”时, //则每隔1000ms检查“管道状态”,并唤醒管道操作:若有“读取管道数据线程被阻塞”,则唤醒该线程。 while(in==out){ checkStateForReceive(); /*full:kickanywaitingreaders*/ notifyAll(); try{ wait(1000); }catch(InterruptedExceptionex){ thrownewjava.io.InterruptedIOException(); } } } //当PipedOutputStream被关闭时,被调用 synchronizedvoidreceivedLast(){ closedByWriter=true; notifyAll(); } //从管道(的缓冲)中读取一个字节,并将其转换成int类型 publicsynchronizedintread()throwsIOException{ if(!connected){ thrownewIOException("Pipenotconnected"); }elseif(closedByReader){ thrownewIOException("Pipeclosed"); }elseif(writeSide!=null&&!writeSide.isAlive() &&!closedByWriter&&(in<0)){ thrownewIOException("Writeenddead"); } readSide=Thread.currentThread(); inttrials=2; while(in<0){ if(closedByWriter){ /*closedbywriter,returnEOF*/ return-1; } if((writeSide!=null)&&(!writeSide.isAlive())&&(--trials<0)){ thrownewIOException("Pipebroken"); } /*mightbeawriterwaiting*/ notifyAll(); try{ wait(1000); }catch(InterruptedExceptionex){ thrownewjava.io.InterruptedIOException(); } } intret=buffer[out++]&0xFF; if(out>=buffer.length){ out=0; } if(in==out){ /*nowempty*/ in=-1; } returnret; } //从管道(的缓冲)中读取数据,并将其存入到数组b中 publicsynchronizedintread(byteb[],intoff,intlen)throwsIOException{ if(b==null){ thrownewNullPointerException(); }elseif(off<0||len<0||len>b.length-off){ thrownewIndexOutOfBoundsException(); }elseif(len==0){ return0; } /*possiblywaitonthefirstcharacter*/ intc=read(); if(c<0){ return-1; } b[off]=(byte)c; intrlen=1; while((in>=0)&&(len>1)){ intavailable; if(in>out){ available=Math.min((buffer.length-out),(in-out)); }else{ available=buffer.length-out; } //Abyteisreadbeforehandoutsidetheloop if(available>(len-1)){ available=len-1; } System.arraycopy(buffer,out,b,off+rlen,available); out+=available; rlen+=available; len-=available; if(out>=buffer.length){ out=0; } if(in==out){ /*nowempty*/ in=-1; } } returnrlen; } //返回不受阻塞地从此输入流中读取的字节数。 publicsynchronizedintavailable()throwsIOException{ if(in<0) return0; elseif(in==out) returnbuffer.length; elseif(in>out) returnin-out; else returnin+buffer.length-out; } //关闭管道输入流 publicvoidclose()throwsIOException{ closedByReader=true; synchronized(this){ in=-1; } } }
管道通信示例
下面,我们看看多线程中通过管道通信的例子。例子中包括3个类:Receiver.java,PipedStreamTest.java和Sender.java。
Receiver.java的代码如下:
importjava.io.IOException; importjava.io.PipedInputStream; @SuppressWarnings("all") /** *接收者线程 */ publicclassReceiverextendsThread{ //管道输入流对象。 //它和“管道输出流(PipedOutputStream)”对象绑定, //从而可以接收“管道输出流”的数据,再让用户读取。 privatePipedInputStreamin=newPipedInputStream(); //获得“管道输入流”对象 publicPipedInputStreamgetInputStream(){ returnin; } @Override publicvoidrun(){ readMessageOnce(); //readMessageContinued(); } //从“管道输入流”中读取1次数据 publicvoidreadMessageOnce(){ //虽然buf的大小是2048个字节,但最多只会从“管道输入流”中读取1024个字节。 //因为,“管道输入流”的缓冲区大小默认只有1024个字节。 byte[]buf=newbyte[2048]; try{ intlen=in.read(buf); System.out.println(newString(buf,0,len)); in.close(); }catch(IOExceptione){ e.printStackTrace(); } } //从“管道输入流”读取>1024个字节时,就停止读取 publicvoidreadMessageContinued(){ inttotal=0; while(true){ byte[]buf=newbyte[1024]; try{ intlen=in.read(buf); total+=len; System.out.println(newString(buf,0,len)); //若读取的字节总数>1024,则退出循环。 if(total>1024) break; }catch(IOExceptione){ e.printStackTrace(); } } try{ in.close(); }catch(IOExceptione){ e.printStackTrace(); } } }
Sender.java的代码如下:
importjava.io.IOException; importjava.io.PipedOutputStream; @SuppressWarnings("all") /** *发送者线程 */ publicclassSenderextendsThread{ //管道输出流对象。 //它和“管道输入流(PipedInputStream)”对象绑定, //从而可以将数据发送给“管道输入流”的数据,然后用户可以从“管道输入流”读取数据。 privatePipedOutputStreamout=newPipedOutputStream(); //获得“管道输出流”对象 publicPipedOutputStreamgetOutputStream(){ returnout; } @Override publicvoidrun(){ writeShortMessage(); //writeLongMessage(); } //向“管道输出流”中写入一则较简短的消息:"thisisashortmessage" privatevoidwriteShortMessage(){ StringstrInfo="thisisashortmessage"; try{ out.write(strInfo.getBytes()); out.close(); }catch(IOExceptione){ e.printStackTrace(); } } //向“管道输出流”中写入一则较长的消息 privatevoidwriteLongMessage(){ StringBuildersb=newStringBuilder(); //通过for循环写入1020个字节 for(inti=0;i<102;i++) sb.append("0123456789"); //再写入26个字节。 sb.append("abcdefghijklmnopqrstuvwxyz"); //str的总长度是1020+26=1046个字节 Stringstr=sb.toString(); try{ //将1046个字节写入到“管道输出流”中 out.write(str.getBytes()); out.close(); }catch(IOExceptione){ e.printStackTrace(); } } }
PipedStreamTest.java的代码如下:
importjava.io.PipedInputStream; importjava.io.PipedOutputStream; importjava.io.IOException; @SuppressWarnings("all") /** *管道输入流和管道输出流的交互程序 */ publicclassPipedStreamTest{ publicstaticvoidmain(String[]args){ Sendert1=newSender(); Receivert2=newReceiver(); PipedOutputStreamout=t1.getOutputStream(); PipedInputStreamin=t2.getInputStream(); try{ //管道连接。下面2句话的本质是一样。 //out.connect(in); in.connect(out); /** *Thread类的START方法: *使该线程开始执行;Java虚拟机调用该线程的run方法。 *结果是两个线程并发地运行;当前线程(从调用返回给start方法)和另一个线程(执行其run方法)。 *多次启动一个线程是非法的。特别是当线程已经结束执行后,不能再重新启动。 */ t1.start(); t2.start(); }catch(IOExceptione){ e.printStackTrace(); } } }
运行结果:
thisisashortmessage
说明:
(01)
in.connect(out);
将“管道输入流”和“管道输出流”关联起来。查看PipedOutputStream.java和PipedInputStream.java中connect()的源码;我们知道out.connect(in);等价于in.connect(out);
(02)
t1.start();//启动“Sender”线程
t2.start();//启动“Receiver”线程
先查看Sender.java的源码,线程启动后执行run()函数;在Sender.java的run()中,调用writeShortMessage();
writeShortMessage();的作用就是向“管道输出流”中写入数据"thisisashortmessage" ;这条数据会被“管道输入流”接收到。下面看看这是如何实现的。
先看write(byteb[])的源码,在OutputStream.java中定义。PipedOutputStream.java继承于OutputStream.java;OutputStream.java中write(byteb[])的源码如下:
publicvoidwrite(byteb[])throwsIOException{ write(b,0,b.length); }
实际上write(byteb[])是调用的PipedOutputStream.java中的write(byteb[],intoff,intlen)函数。查看write(byteb[],intoff,intlen)的源码,我们发现:它会调用sink.receive(b,off,len);进一步查看receive(byteb[],intoff,intlen)的定义,我们知道sink.receive(b,off,len)的作用就是:将“管道输出流”中的数据保存到“管道输入流”的缓冲中。而“管道输入流”的缓冲区buffer的默认大小是1024个字节。
至此,我们知道:t1.start()启动Sender线程,而Sender线程会将数据"thisisashortmessage"写入到“管道输出流”;而“管道输出流”又会将该数据传输给“管道输入流”,即而保存在“管道输入流”的缓冲中。
接下来,我们看看“用户如何从‘管道输入流'的缓冲中读取数据”。这实际上就是Receiver线程的动作。
t2.start()会启动Receiver线程,从而执行Receiver.java的run()函数。查看Receiver.java的源码,我们知道run()调用了readMessageOnce()。
而readMessageOnce()就是调用in.read(buf)从“管道输入流in”中读取数据,并保存到buf中。
通过上面的分析,我们已经知道“管道输入流in”的缓冲中的数据是"thisisashortmessage";因此,buf的数据就是"thisisashortmessage"。
为了加深对管道的理解。我们接着进行下面两个小试验。
试验一:修改Sender.java
将
publicvoidrun(){ writeShortMessage(); //writeLongMessage(); }
修改为
publicvoidrun(){ //writeShortMessage(); writeLongMessage(); }
运行程序。运行结果为:
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
012345678901234567890123456789abcd
这些数据是通过writeLongMessage()写入到“管道输出流”,然后传送给“管道输入流”,进而存储在“管道输入流”的缓冲中;再被用户从缓冲读取出来的数据。
然后,观察writeLongMessage()的源码。我们可以发现,str的长度是1046个字节,然后运行结果只有1024个字节!为什么会这样呢?
道理很简单:管道输入流的缓冲区默认大小是1024个字节。所以,最多只能写入1024个字节。
观察PipedInputStream.java的源码,我们能了解的更透彻。
privatestaticfinalintDEFAULT_PIPE_SIZE=1024; publicPipedInputStream(){ initPipe(DEFAULT_PIPE_SIZE); }
默认构造函数调用initPipe(DEFAULT_PIPE_SIZE),它的源码如下:
privatevoidinitPipe(intpipeSize){ if(pipeSize<=0){ thrownewIllegalArgumentException("PipeSize<=0"); } buffer=newbyte[pipeSize]; }
从中,我们可以知道缓冲区buffer的默认大小就是1024个字节。
试验二:在“试验一”的基础上继续修改Receiver.java
将
publicvoidrun(){ readMessageOnce(); //readMessageContinued(); }
修改为
publicvoidrun(){ //readMessageOnce(); readMessageContinued(); }
运行程序。运行结果为:
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
012345678901234567890123456789abcd
efghijklmnopqrstuvwxyz
这个结果才是writeLongMessage()写入到“输入缓冲区”的完整数据。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。