java使用多线程读取超大文件
接上次写的“JAVA读取超大文件”。在读取超过10G的文件时会发现一次读一行的速度实在是不能接受,想到使用多线程+FileChannel来做一个使用多线程版本。
基本思路如下:
1.计算出文件总大小
2.分段处理,计算出每个线程读取文件的开始与结束位置
(文件大小/线程数)*N,N是指第几个线程,这样能得到每个线程在读该文件的大概起始位置
使用"大概起始位置",作为读文件的开始偏移量(fileChannel.position("大概起始位置")),来读取该文件,直到读到第一个换行符,记录下这个换行符的位置,作为该线程的准确起始位置.同时它也是上一个线程的结束位置.最后一个线程的结束位置也直接设置为-1
3.启动线程,每个线程从开始位置读取到结束位置为止
代码如下:
读文件工具类
importjava.io.*; importjava.nio.ByteBuffer; importjava.nio.channels.FileChannel; importjava.util.Observable; /** *CreatedwithIntelliJIDEA. *User:okey *Date:14-4-2 *Time:下午3:12 *读取文件 */ publicclassReadFileextendsObservable{ privateintbufSize=1024; //换行符 privatebytekey="\n".getBytes()[0]; //当前行数 privatelonglineNum=0; //文件编码,默认为gb2312 privateStringencode="gb2312"; //具体业务逻辑监听器 privateReaderFileListenerreaderListener; publicvoidsetEncode(Stringencode){ this.encode=encode; } publicvoidsetReaderListener(ReaderFileListenerreaderListener){ this.readerListener=readerListener; } /** *获取准确开始位置 *@paramfile *@paramposition *@return *@throwsException */ publiclonggetStartNum(Filefile,longposition)throwsException{ longstartNum=position; FileChannelfcin=newRandomAccessFile(file,"r").getChannel(); fcin.position(position); try{ intcache=1024; ByteBufferrBuffer=ByteBuffer.allocate(cache); //每次读取的内容 byte[]bs=newbyte[cache]; //缓存 byte[]tempBs=newbyte[0]; Stringline=""; while(fcin.read(rBuffer)!=-1){ intrSize=rBuffer.position(); rBuffer.rewind(); rBuffer.get(bs); rBuffer.clear(); byte[]newStrByte=bs; //如果发现有上次未读完的缓存,则将它加到当前读取的内容前面 if(null!=tempBs){ inttL=tempBs.length; newStrByte=newbyte[rSize+tL]; System.arraycopy(tempBs,0,newStrByte,0,tL); System.arraycopy(bs,0,newStrByte,tL,rSize); } //获取开始位置之后的第一个换行符 intendIndex=indexOf(newStrByte,0); if(endIndex!=-1){ returnstartNum+endIndex; } tempBs=substring(newStrByte,0,newStrByte.length); startNum+=1024; } }catch(Exceptione){ e.printStackTrace(); }finally{ fcin.close(); } returnposition; } /** *从设置的开始位置读取文件,一直到结束为止。如果end设置为负数,刚读取到文件末尾 *@paramfullPath *@paramstart *@paramend *@throwsException */ publicvoidreadFileByLine(StringfullPath,longstart,longend)throwsException{ Filefin=newFile(fullPath); if(fin.exists()){ FileChannelfcin=newRandomAccessFile(fin,"r").getChannel(); fcin.position(start); try{ ByteBufferrBuffer=ByteBuffer.allocate(bufSize); //每次读取的内容 byte[]bs=newbyte[bufSize]; //缓存 byte[]tempBs=newbyte[0]; Stringline=""; //当前读取文件位置 longnowCur=start; while(fcin.read(rBuffer)!=-1){ nowCur+=bufSize; intrSize=rBuffer.position(); rBuffer.rewind(); rBuffer.get(bs); rBuffer.clear(); byte[]newStrByte=bs; //如果发现有上次未读完的缓存,则将它加到当前读取的内容前面 if(null!=tempBs){ inttL=tempBs.length; newStrByte=newbyte[rSize+tL]; System.arraycopy(tempBs,0,newStrByte,0,tL); System.arraycopy(bs,0,newStrByte,tL,rSize); } //是否已经读到最后一位 booleanisEnd=false; //如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置 if(end>0&&nowCur>end){ //缓存长度-当前已经读取位数-最后位数 intl=newStrByte.length-(int)(nowCur-end); newStrByte=substring(newStrByte,0,l); isEnd=true; } intfromIndex=0; intendIndex=0; //每次读一行内容,以key(默认为\n)作为结束符 while((endIndex=indexOf(newStrByte,fromIndex))!=-1){ byte[]bLine=substring(newStrByte,fromIndex,endIndex); line=newString(bLine,0,bLine.length,encode); lineNum++; //输出一行内容,处理方式由调用方提供 readerListener.outLine(line.trim(),lineNum,false); fromIndex=endIndex+1; } //将未读取完成的内容放到缓存中 tempBs=substring(newStrByte,fromIndex,newStrByte.length); if(isEnd){ break; } } //将剩下的最后内容作为一行,输出,并指明这是最后一行 StringlineStr=newString(tempBs,0,tempBs.length,encode); readerListener.outLine(lineStr.trim(),lineNum,true); }catch(Exceptione){ e.printStackTrace(); }finally{ fcin.close(); } }else{ thrownewFileNotFoundException("没有找到文件:"+fullPath); } //通知观察者,当前工作已经完成 setChanged(); notifyObservers(start+"-"+end); } /** *查找一个byte[]从指定位置之后的一个换行符位置 * *@paramsrc *@paramfromIndex *@return *@throwsException */ privateintindexOf(byte[]src,intfromIndex)throwsException{ for(inti=fromIndex;i读文件线程
/** *CreatedwithIntelliJIDEA. *User:okey *Date:14-4-2 *Time:下午4:50 *TochangethistemplateuseFile|Settings|FileTemplates. */ publicclassReadFileThreadextendsThread{ privateReaderFileListenerprocessPoiDataListeners; privateStringfilePath; privatelongstart; privatelongend; publicReadFileThread(ReaderFileListenerprocessPoiDataListeners,longstart,longend,Stringfile){ this.setName(this.getName()+"-ReadFileThread"); this.start=start; this.end=end; this.filePath=file; this.processPoiDataListeners=processPoiDataListeners; } @Override publicvoidrun(){ ReadFilereadFile=newReadFile(); readFile.setReaderListener(processPoiDataListeners); readFile.setEncode(processPoiDataListeners.getEncode()); //readFile.addObserver(); try{ readFile.readFileByLine(filePath,start,end+1); }catch(Exceptione){ e.printStackTrace(); } } }具体业务逻辑监听
/** *CreatedwithOkey *User:Okey *Date:13-3-14 *Time:下午3:19 *NIO逐行读数据回调方法 */ publicabstractclassReaderFileListener{ //一次读取行数,默认为500 privateintreadColNum=500; privateStringencode; privateListlist=newArrayList (); /** *设置一次读取行数 *@paramreadColNum */ protectedvoidsetReadColNum(intreadColNum){ this.readColNum=readColNum; } publicStringgetEncode(){ returnencode; } publicvoidsetEncode(Stringencode){ this.encode=encode; } /** *每读取到一行数据,添加到缓存中 *@paramlineStr读取到的数据 *@paramlineNum行号 *@paramover是否读取完成 *@throwsException */ publicvoidoutLine(StringlineStr,longlineNum,booleanover)throwsException{ if(null!=lineStr) list.add(lineStr); if(!over&&(lineNum%readColNum==0)){ output(list); list.clear(); }elseif(over){ output(list); list.clear(); } } /** *批量输出 * *@paramstringList *@throwsException */ publicabstractvoidoutput(List stringList)throwsException; } 线程调度
importjava.io.File; importjava.io.FileInputStream; importjava.io.IOException; /** *CreatedwithIntelliJIDEA. *User:okey *Date:14-4-1 *Time:下午6:03 *TochangethistemplateuseFile|Settings|FileTemplates. */ publicclassBuildData{ publicstaticvoidmain(String[]args)throwsException{ Filefile=newFile("E:\\1396341974289.csv"); FileInputStreamfis=null; try{ ReadFilereadFile=newReadFile(); fis=newFileInputStream(file); intavailable=fis.available(); intmaxThreadNum=50; //线程粗略开始位置 inti=available/maxThreadNum; for(intj=0;j现在就可以尽情的调整 maxThreadNum来享受风一般的速度吧!
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。