Java多线程实现快速切分文件的程序
前段时间需要进行大批量数据导入,DBA给提供的是CVS文件,但是每个CVS文件都好几个GB大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。
importorg.apache.log4j.LogManager; importorg.apache.log4j.Logger; importjava.io.*; importjava.util.*; importjava.util.concurrent.*; publicclassFileSplitUtil{ privatefinalstaticLoggerlog=LogManager.getLogger(FileSplitUtil.class); privatestaticfinallongoriginFileSize=1024*1024*100;//100M privatestaticfinalintblockFileSize=1024*1024*64;//防止中文乱码,必须取2的N次方 /** *CVS文件分隔符 */ privatestaticfinalcharcvsSeparator='^'; publicstaticvoidmain(Stringargs[]){ longstart=System.currentTimeMillis(); try{ StringfileName="D:\\csvtest\\aa.csv"; FilesourceFile=newFile(fileName); if(sourceFile.length()>=originFileSize){ StringcvsFileName=fileName.replaceAll("\\\\","/"); FileSplitUtilfileSplitUtil=newFileSplitUtil(); List<String>parts=fileSplitUtil.splitBySize(cvsFileName,blockFileSize); for(Stringpart:parts){ System.out.println("partNameis:"+part); } } System.out.println("总文件长度"+sourceFile.length()+",拆分文件耗时:"+(System.currentTimeMillis()-start)+"ms."); }catch(Exceptione){ log.info(e.getStackTrace()); } } /** *拆分文件 * *@paramfileName待拆分的完整文件名 *@parambyteSize按多少字节大小拆分 *@return拆分后的文件名列表 */ publicList<String>splitBySize(StringfileName,intbyteSize) throwsIOException,InterruptedException{ List<String>parts=newArrayList<String>(); Filefile=newFile(fileName); intcount=(int)Math.ceil(file.length()/(double)byteSize); intcountLen=(count+"").length(); RandomAccessFileraf=newRandomAccessFile(fileName,"r"); longtotalLen=raf.length(); CountDownLatchlatch=newCountDownLatch(count); for(inti=0;i<count;i++){ StringpartFileName=file.getPath()+"." +leftPad((i+1)+"",countLen,'0')+".cvs"; intreadSize=byteSize; longstartPos=(long)i*byteSize; longnextPos=(long)(i+1)*byteSize; if(nextPos>totalLen){ readSize=(int)(totalLen-startPos); } newSplitRunnable(readSize,startPos,partFileName,file,latch).run(); parts.add(partFileName); } latch.await();//等待所有文件写完 //由于切割时可能会导致行被切断,加工所有的的分割文件,合并行 mergeRow(parts); returnparts; } /** *分割处理Runnable * *@authorsupeidong */ privateclassSplitRunnableimplementsRunnable{ intbyteSize; StringpartFileName; FileoriginFile; longstartPos; CountDownLatchlatch; publicSplitRunnable(intbyteSize,longstartPos,StringpartFileName, FileoriginFile,CountDownLatchlatch){ this.startPos=startPos; this.byteSize=byteSize; this.partFileName=partFileName; this.originFile=originFile; this.latch=latch; } publicvoidrun(){ RandomAccessFilerFile; OutputStreamos; try{ rFile=newRandomAccessFile(originFile,"r"); byte[]b=newbyte[byteSize]; rFile.seek(startPos);//移动指针到每“段”开头 ints=rFile.read(b); os=newFileOutputStream(partFileName); os.write(b,0,s); os.flush(); os.close(); latch.countDown(); }catch(IOExceptione){ log.error(e.getMessage()); latch.countDown(); } } } /** *合并被切断的行 * *@paramparts */ privatevoidmergeRow(List<String>parts){ List<PartFile>partFiles=newArrayList<PartFile>(); try{ //组装被切分表对象 for(inti=0;i<parts.size();i++){ StringpartFileName=parts.get(i); FilesplitFileTemp=newFile(partFileName); if(splitFileTemp.exists()){ PartFilepartFile=newPartFile(); BufferedReaderreader=newBufferedReader(newInputStreamReader(newFileInputStream(splitFileTemp),"gbk")); StringfirstRow=reader.readLine(); StringsecondRow=reader.readLine(); StringendRow=readLastLine(partFileName); partFile.setPartFileName(partFileName); partFile.setFirstRow(firstRow); partFile.setEndRow(endRow); if(i>=1){ StringprePartFile=parts.get(i-1); StringpreEndRow=readLastLine(prePartFile); partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow)); } partFiles.add(partFile); reader.close(); } } //进行需要合并的行的写入 for(inti=0;i<partFiles.size()-1;i++){ PartFilepartFile=partFiles.get(i); PartFilepartFileNext=partFiles.get(i+1); StringBuildersb=newStringBuilder(); if(partFileNext.getFirstIsFull()){ sb.append("\r\n"); sb.append(partFileNext.getFirstRow()); }else{ sb.append(partFileNext.getFirstRow()); } writeLastLine(partFile.getPartFileName(),sb.toString()); } }catch(Exceptione){ log.error(e.getMessage()); } } /** *得到某个字符出现的次数 *@params *@return */ privateintgetCharCount(Strings){ intcount=0; for(inti=0;i<s.length();i++){ if(s.charAt(i)==cvsSeparator){ count++; } } returncount; } /** *采用BufferedInputStream方式读取文件行数 * *@paramfilename *@return */ publicintgetFileRow(Stringfilename)throwsIOException{ InputStreamis=newBufferedInputStream(newFileInputStream(filename)); byte[]c=newbyte[1024]; intcount=0; intreadChars=0; while((readChars=is.read(c))!=-1){ for(inti=0;i<readChars;++i){ if(c[i]=='\n') ++count; } } is.close(); returncount; } /** *读取最后一行数据 *@paramfilename *@return *@throwsIOException */ privateStringreadLastLine(Stringfilename)throwsIOException{ //使用RandomAccessFile,从后找最后一行数据 RandomAccessFileraf=newRandomAccessFile(filename,"r"); longlen=raf.length(); StringlastLine=""; if(len!=0L){ longpos=len-1; while(pos>0){ pos--; raf.seek(pos); if(raf.readByte()=='\n'){ lastLine=raf.readLine(); lastLine=newString(lastLine.getBytes("8859_1"),"gbk"); break; } } } raf.close(); returnlastLine; } /** *修改最后一行数据 *@paramfileName *@paramlastString *@return *@throwsIOException */ privatevoidwriteLastLine(StringfileName,StringlastString){ try{ //打开一个随机访问文件流,按读写方式 RandomAccessFilerandomFile=newRandomAccessFile(fileName,"rw"); //文件长度,字节数 longfileLength=randomFile.length(); //将写文件指针移到文件尾。 randomFile.seek(fileLength); //此处必须加gbk,否则会出现写入乱码 randomFile.write(lastString.getBytes("gbk")); randomFile.close(); }catch(IOExceptione){ log.error(e.getMessage()); } } /** *左填充 * *@paramstr *@paramlength *@paramch *@return */ publicstaticStringleftPad(Stringstr,intlength,charch){ if(str.length()>=length){ returnstr; } char[]chs=newchar[length]; Arrays.fill(chs,ch); char[]src=str.toCharArray(); System.arraycopy(src,0,chs,length-src.length,src.length); returnnewString(chs); } /** *合并文件行内部类 */ classPartFile{ privateStringpartFileName; privateStringfirstRow; privateStringendRow; privatebooleanfirstIsFull; publicStringgetPartFileName(){ returnpartFileName; } publicvoidsetPartFileName(StringpartFileName){ this.partFileName=partFileName; } publicStringgetFirstRow(){ returnfirstRow; } publicvoidsetFirstRow(StringfirstRow){ this.firstRow=firstRow; } publicStringgetEndRow(){ returnendRow; } publicvoidsetEndRow(StringendRow){ this.endRow=endRow; } publicbooleangetFirstIsFull(){ returnfirstIsFull; } publicvoidsetFirstIsFull(booleanfirstIsFull){ this.firstIsFull=firstIsFull; } } }
以上就是本文的全部内容,希望对大家学习java程序设计有所帮助。