详解PipedInputStream和PipedOutputStream_动力节点Java学院整理
java管道介绍
在java中,PipedOutputStream和PipedInputStream分别是管道输出流和管道输入流。
它们的作用是让多线程可以通过管道进行线程间的通讯。在使用管道通信时,必须将PipedOutputStream和PipedInputStream配套使用。
使用管道通信时,大致的流程是:我们在线程A中向PipedOutputStream中写入数据,这些数据会自动的发送到与PipedOutputStream对应的PipedInputStream中,进而存储在PipedInputStream的缓冲中;此时,线程B通过读取PipedInputStream中的数据。就可以实现,线程A和线程B的通信。
PipedOutputStream和PipedInputStream源码分析
下面介绍PipedOutputStream和PipedInputStream的源码。在阅读它们的源码之前,建议先看看源码后面的示例。待理解管道的作用和用法之后,再看源码,可能更容易理解。
1.PipedOutputStream源码分析(基于jdk1.7.40)
packagejava.io;
importjava.io.*;
publicclassPipedOutputStreamextendsOutputStream{
//与PipedOutputStream通信的PipedInputStream对象
privatePipedInputStreamsink;
//构造函数,指定配对的PipedInputStream
publicPipedOutputStream(PipedInputStreamsnk)throwsIOException{
connect(snk);
}
//构造函数
publicPipedOutputStream(){
}
//将“管道输出流”和“管道输入流”连接。
publicsynchronizedvoidconnect(PipedInputStreamsnk)throwsIOException{
if(snk==null){
thrownewNullPointerException();
}elseif(sink!=null||snk.connected){
thrownewIOException("Alreadyconnected");
}
//设置“管道输入流”
sink=snk;
//初始化“管道输入流”的读写位置
//int是PipedInputStream中定义的,代表“管道输入流”的读写位置
snk.in=-1;
//初始化“管道输出流”的读写位置。
//out是PipedInputStream中定义的,代表“管道输出流”的读写位置
snk.out=0;
//设置“管道输入流”和“管道输出流”为已连接状态
//connected是PipedInputStream中定义的,用于表示“管道输入流与管道输出流”是否已经连接
snk.connected=true;
}
//将int类型b写入“管道输出流”中。
//将b写入“管道输出流”之后,它会将b传输给“管道输入流”
publicvoidwrite(intb)throwsIOException{
if(sink==null){
thrownewIOException("Pipenotconnected");
}
sink.receive(b);
}
//将字节数组b写入“管道输出流”中。
//将数组b写入“管道输出流”之后,它会将其传输给“管道输入流”
publicvoidwrite(byteb[],intoff,intlen)throwsIOException{
if(sink==null){
thrownewIOException("Pipenotconnected");
}elseif(b==null){
thrownewNullPointerException();
}elseif((off<0)||(off>b.length)||(len<0)||
((off+len)>b.length)||((off+len)<0)){
thrownewIndexOutOfBoundsException();
}elseif(len==0){
return;
}
//“管道输入流”接收数据
sink.receive(b,off,len);
}
//清空“管道输出流”。
//这里会调用“管道输入流”的notifyAll();
//目的是让“管道输入流”放弃对当前资源的占有,让其它的等待线程(等待读取管道输出流的线程)读取“管道输出流”的值。
publicsynchronizedvoidflush()throwsIOException{
if(sink!=null){
synchronized(sink){
sink.notifyAll();
}
}
}
//关闭“管道输出流”。
//关闭之后,会调用receivedLast()通知“管道输入流”它已经关闭。
publicvoidclose()throwsIOException{
if(sink!=null){
sink.receivedLast();
}
}
}
2.PipedInputStream源码分析(基于jdk1.7.40)
packagejava.io;
publicclassPipedInputStreamextendsInputStream{
//“管道输出流”是否关闭的标记
booleanclosedByWriter=false;
//“管道输入流”是否关闭的标记
volatilebooleanclosedByReader=false;
//“管道输入流”与“管道输出流”是否连接的标记
//它在PipedOutputStream的connect()连接函数中被设置为true
booleanconnected=false;
ThreadreadSide;//读取“管道”数据的线程
ThreadwriteSide;//向“管道”写入数据的线程
//“管道”的默认大小
privatestaticfinalintDEFAULT_PIPE_SIZE=1024;
protectedstaticfinalintPIPE_SIZE=DEFAULT_PIPE_SIZE;
//缓冲区
protectedbytebuffer[];
//下一个写入字节的位置。in==out代表满,说明“写入的数据”全部被读取了。
protectedintin=-1;
//下一个读取字节的位置。in==out代表满,说明“写入的数据”全部被读取了。
protectedintout=0;
//构造函数:指定与“管道输入流”关联的“管道输出流”
publicPipedInputStream(PipedOutputStreamsrc)throwsIOException{
this(src,DEFAULT_PIPE_SIZE);
}
//构造函数:指定与“管道输入流”关联的“管道输出流”,以及“缓冲区大小”
publicPipedInputStream(PipedOutputStreamsrc,intpipeSize)
throwsIOException{
initPipe(pipeSize);
connect(src);
}
//构造函数:默认缓冲区大小是1024字节
publicPipedInputStream(){
initPipe(DEFAULT_PIPE_SIZE);
}
//构造函数:指定缓冲区大小是pipeSize
publicPipedInputStream(intpipeSize){
initPipe(pipeSize);
}
//初始化“管道”:新建缓冲区大小
privatevoidinitPipe(intpipeSize){
if(pipeSize<=0){
thrownewIllegalArgumentException("PipeSize<=0");
}
buffer=newbyte[pipeSize];
}
//将“管道输入流”和“管道输出流”绑定。
//实际上,这里调用的是PipedOutputStream的connect()函数
publicvoidconnect(PipedOutputStreamsrc)throwsIOException{
src.connect(this);
}
//接收int类型的数据b。
//它只会在PipedOutputStream的write(intb)中会被调用
protectedsynchronizedvoidreceive(intb)throwsIOException{
//检查管道状态
checkStateForReceive();
//获取“写入管道”的线程
writeSide=Thread.currentThread();
//若“写入管道”的数据正好全部被读取完,则等待。
if(in==out)
awaitSpace();
if(in<0){
in=0;
out=0;
}
//将b保存到缓冲区
buffer[in++]=(byte)(b&0xFF);
if(in>=buffer.length){
in=0;
}
}
//接收字节数组b。
synchronizedvoidreceive(byteb[],intoff,intlen)throwsIOException{
//检查管道状态
checkStateForReceive();
//获取“写入管道”的线程
writeSide=Thread.currentThread();
intbytesToTransfer=len;
while(bytesToTransfer>0){
//若“写入管道”的数据正好全部被读取完,则等待。
if(in==out)
awaitSpace();
intnextTransferAmount=0;
//如果“管道中被读取的数据,少于写入管道的数据”;
//则设置nextTransferAmount=“buffer.length-in”
if(outbytesToTransfer)
nextTransferAmount=bytesToTransfer;
//assert断言的作用是,若nextTransferAmount<=0,则终止程序。
assert(nextTransferAmount>0);
//将数据写入到缓冲中
System.arraycopy(b,off,buffer,in,nextTransferAmount);
bytesToTransfer-=nextTransferAmount;
off+=nextTransferAmount;
in+=nextTransferAmount;
if(in>=buffer.length){
in=0;
}
}
}
//检查管道状态
privatevoidcheckStateForReceive()throwsIOException{
if(!connected){
thrownewIOException("Pipenotconnected");
}elseif(closedByWriter||closedByReader){
thrownewIOException("Pipeclosed");
}elseif(readSide!=null&&!readSide.isAlive()){
thrownewIOException("Readenddead");
}
}
//等待。
//若“写入管道”的数据正好全部被读取完(例如,管道缓冲满),则执行awaitSpace()操作;
//它的目的是让“读取管道的线程”管道产生读取数据请求,从而才能继续的向“管道”中写入数据。
privatevoidawaitSpace()throwsIOException{
//如果“管道中被读取的数据,等于写入管道的数据”时,
//则每隔1000ms检查“管道状态”,并唤醒管道操作:若有“读取管道数据线程被阻塞”,则唤醒该线程。
while(in==out){
checkStateForReceive();
/*full:kickanywaitingreaders*/
notifyAll();
try{
wait(1000);
}catch(InterruptedExceptionex){
thrownewjava.io.InterruptedIOException();
}
}
}
//当PipedOutputStream被关闭时,被调用
synchronizedvoidreceivedLast(){
closedByWriter=true;
notifyAll();
}
//从管道(的缓冲)中读取一个字节,并将其转换成int类型
publicsynchronizedintread()throwsIOException{
if(!connected){
thrownewIOException("Pipenotconnected");
}elseif(closedByReader){
thrownewIOException("Pipeclosed");
}elseif(writeSide!=null&&!writeSide.isAlive()
&&!closedByWriter&&(in<0)){
thrownewIOException("Writeenddead");
}
readSide=Thread.currentThread();
inttrials=2;
while(in<0){
if(closedByWriter){
/*closedbywriter,returnEOF*/
return-1;
}
if((writeSide!=null)&&(!writeSide.isAlive())&&(--trials<0)){
thrownewIOException("Pipebroken");
}
/*mightbeawriterwaiting*/
notifyAll();
try{
wait(1000);
}catch(InterruptedExceptionex){
thrownewjava.io.InterruptedIOException();
}
}
intret=buffer[out++]&0xFF;
if(out>=buffer.length){
out=0;
}
if(in==out){
/*nowempty*/
in=-1;
}
returnret;
}
//从管道(的缓冲)中读取数据,并将其存入到数组b中
publicsynchronizedintread(byteb[],intoff,intlen)throwsIOException{
if(b==null){
thrownewNullPointerException();
}elseif(off<0||len<0||len>b.length-off){
thrownewIndexOutOfBoundsException();
}elseif(len==0){
return0;
}
/*possiblywaitonthefirstcharacter*/
intc=read();
if(c<0){
return-1;
}
b[off]=(byte)c;
intrlen=1;
while((in>=0)&&(len>1)){
intavailable;
if(in>out){
available=Math.min((buffer.length-out),(in-out));
}else{
available=buffer.length-out;
}
//Abyteisreadbeforehandoutsidetheloop
if(available>(len-1)){
available=len-1;
}
System.arraycopy(buffer,out,b,off+rlen,available);
out+=available;
rlen+=available;
len-=available;
if(out>=buffer.length){
out=0;
}
if(in==out){
/*nowempty*/
in=-1;
}
}
returnrlen;
}
//返回不受阻塞地从此输入流中读取的字节数。
publicsynchronizedintavailable()throwsIOException{
if(in<0)
return0;
elseif(in==out)
returnbuffer.length;
elseif(in>out)
returnin-out;
else
returnin+buffer.length-out;
}
//关闭管道输入流
publicvoidclose()throwsIOException{
closedByReader=true;
synchronized(this){
in=-1;
}
}
}
管道通信示例
下面,我们看看多线程中通过管道通信的例子。例子中包括3个类:Receiver.java,PipedStreamTest.java和Sender.java。
Receiver.java的代码如下:
importjava.io.IOException;
importjava.io.PipedInputStream;
@SuppressWarnings("all")
/**
*接收者线程
*/
publicclassReceiverextendsThread{
//管道输入流对象。
//它和“管道输出流(PipedOutputStream)”对象绑定,
//从而可以接收“管道输出流”的数据,再让用户读取。
privatePipedInputStreamin=newPipedInputStream();
//获得“管道输入流”对象
publicPipedInputStreamgetInputStream(){
returnin;
}
@Override
publicvoidrun(){
readMessageOnce();
//readMessageContinued();
}
//从“管道输入流”中读取1次数据
publicvoidreadMessageOnce(){
//虽然buf的大小是2048个字节,但最多只会从“管道输入流”中读取1024个字节。
//因为,“管道输入流”的缓冲区大小默认只有1024个字节。
byte[]buf=newbyte[2048];
try{
intlen=in.read(buf);
System.out.println(newString(buf,0,len));
in.close();
}catch(IOExceptione){
e.printStackTrace();
}
}
//从“管道输入流”读取>1024个字节时,就停止读取
publicvoidreadMessageContinued(){
inttotal=0;
while(true){
byte[]buf=newbyte[1024];
try{
intlen=in.read(buf);
total+=len;
System.out.println(newString(buf,0,len));
//若读取的字节总数>1024,则退出循环。
if(total>1024)
break;
}catch(IOExceptione){
e.printStackTrace();
}
}
try{
in.close();
}catch(IOExceptione){
e.printStackTrace();
}
}
}
Sender.java的代码如下:
importjava.io.IOException;
importjava.io.PipedOutputStream;
@SuppressWarnings("all")
/**
*发送者线程
*/
publicclassSenderextendsThread{
//管道输出流对象。
//它和“管道输入流(PipedInputStream)”对象绑定,
//从而可以将数据发送给“管道输入流”的数据,然后用户可以从“管道输入流”读取数据。
privatePipedOutputStreamout=newPipedOutputStream();
//获得“管道输出流”对象
publicPipedOutputStreamgetOutputStream(){
returnout;
}
@Override
publicvoidrun(){
writeShortMessage();
//writeLongMessage();
}
//向“管道输出流”中写入一则较简短的消息:"thisisashortmessage"
privatevoidwriteShortMessage(){
StringstrInfo="thisisashortmessage";
try{
out.write(strInfo.getBytes());
out.close();
}catch(IOExceptione){
e.printStackTrace();
}
}
//向“管道输出流”中写入一则较长的消息
privatevoidwriteLongMessage(){
StringBuildersb=newStringBuilder();
//通过for循环写入1020个字节
for(inti=0;i<102;i++)
sb.append("0123456789");
//再写入26个字节。
sb.append("abcdefghijklmnopqrstuvwxyz");
//str的总长度是1020+26=1046个字节
Stringstr=sb.toString();
try{
//将1046个字节写入到“管道输出流”中
out.write(str.getBytes());
out.close();
}catch(IOExceptione){
e.printStackTrace();
}
}
}
PipedStreamTest.java的代码如下:
importjava.io.PipedInputStream;
importjava.io.PipedOutputStream;
importjava.io.IOException;
@SuppressWarnings("all")
/**
*管道输入流和管道输出流的交互程序
*/
publicclassPipedStreamTest{
publicstaticvoidmain(String[]args){
Sendert1=newSender();
Receivert2=newReceiver();
PipedOutputStreamout=t1.getOutputStream();
PipedInputStreamin=t2.getInputStream();
try{
//管道连接。下面2句话的本质是一样。
//out.connect(in);
in.connect(out);
/**
*Thread类的START方法:
*使该线程开始执行;Java虚拟机调用该线程的run方法。
*结果是两个线程并发地运行;当前线程(从调用返回给start方法)和另一个线程(执行其run方法)。
*多次启动一个线程是非法的。特别是当线程已经结束执行后,不能再重新启动。
*/
t1.start();
t2.start();
}catch(IOExceptione){
e.printStackTrace();
}
}
}
运行结果:
thisisashortmessage
说明:
(01)
in.connect(out);
将“管道输入流”和“管道输出流”关联起来。查看PipedOutputStream.java和PipedInputStream.java中connect()的源码;我们知道out.connect(in);等价于in.connect(out);
(02)
t1.start();//启动“Sender”线程
t2.start();//启动“Receiver”线程
先查看Sender.java的源码,线程启动后执行run()函数;在Sender.java的run()中,调用writeShortMessage();
writeShortMessage();的作用就是向“管道输出流”中写入数据"thisisashortmessage" ;这条数据会被“管道输入流”接收到。下面看看这是如何实现的。
先看write(byteb[])的源码,在OutputStream.java中定义。PipedOutputStream.java继承于OutputStream.java;OutputStream.java中write(byteb[])的源码如下:
publicvoidwrite(byteb[])throwsIOException{
write(b,0,b.length);
}
实际上write(byteb[])是调用的PipedOutputStream.java中的write(byteb[],intoff,intlen)函数。查看write(byteb[],intoff,intlen)的源码,我们发现:它会调用sink.receive(b,off,len);进一步查看receive(byteb[],intoff,intlen)的定义,我们知道sink.receive(b,off,len)的作用就是:将“管道输出流”中的数据保存到“管道输入流”的缓冲中。而“管道输入流”的缓冲区buffer的默认大小是1024个字节。
至此,我们知道:t1.start()启动Sender线程,而Sender线程会将数据"thisisashortmessage"写入到“管道输出流”;而“管道输出流”又会将该数据传输给“管道输入流”,即而保存在“管道输入流”的缓冲中。
接下来,我们看看“用户如何从‘管道输入流'的缓冲中读取数据”。这实际上就是Receiver线程的动作。
t2.start()会启动Receiver线程,从而执行Receiver.java的run()函数。查看Receiver.java的源码,我们知道run()调用了readMessageOnce()。
而readMessageOnce()就是调用in.read(buf)从“管道输入流in”中读取数据,并保存到buf中。
通过上面的分析,我们已经知道“管道输入流in”的缓冲中的数据是"thisisashortmessage";因此,buf的数据就是"thisisashortmessage"。
为了加深对管道的理解。我们接着进行下面两个小试验。
试验一:修改Sender.java
将
publicvoidrun(){
writeShortMessage();
//writeLongMessage();
}
修改为
publicvoidrun(){
//writeShortMessage();
writeLongMessage();
}
运行程序。运行结果为:
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
012345678901234567890123456789abcd
这些数据是通过writeLongMessage()写入到“管道输出流”,然后传送给“管道输入流”,进而存储在“管道输入流”的缓冲中;再被用户从缓冲读取出来的数据。
然后,观察writeLongMessage()的源码。我们可以发现,str的长度是1046个字节,然后运行结果只有1024个字节!为什么会这样呢?
道理很简单:管道输入流的缓冲区默认大小是1024个字节。所以,最多只能写入1024个字节。
观察PipedInputStream.java的源码,我们能了解的更透彻。
privatestaticfinalintDEFAULT_PIPE_SIZE=1024;
publicPipedInputStream(){
initPipe(DEFAULT_PIPE_SIZE);
}
默认构造函数调用initPipe(DEFAULT_PIPE_SIZE),它的源码如下:
privatevoidinitPipe(intpipeSize){
if(pipeSize<=0){
thrownewIllegalArgumentException("PipeSize<=0");
}
buffer=newbyte[pipeSize];
}
从中,我们可以知道缓冲区buffer的默认大小就是1024个字节。
试验二:在“试验一”的基础上继续修改Receiver.java
将
publicvoidrun(){
readMessageOnce();
//readMessageContinued();
}
修改为
publicvoidrun(){
//readMessageOnce();
readMessageContinued();
}
运行程序。运行结果为:
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789
012345678901234567890123456789abcd
efghijklmnopqrstuvwxyz
这个结果才是writeLongMessage()写入到“输入缓冲区”的完整数据。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。