Java多线程实现快速切分文件的程序
前段时间需要进行大批量数据导入,DBA给提供的是CVS文件,但是每个CVS文件都好几个GB大小,直接进行load,数据库很慢还会产生内存不足的问题,为了实现这个功能,写了个快速切分文件的程序。
importorg.apache.log4j.LogManager;
importorg.apache.log4j.Logger;
importjava.io.*;
importjava.util.*;
importjava.util.concurrent.*;
publicclassFileSplitUtil{
privatefinalstaticLoggerlog=LogManager.getLogger(FileSplitUtil.class);
privatestaticfinallongoriginFileSize=1024*1024*100;//100M
privatestaticfinalintblockFileSize=1024*1024*64;//防止中文乱码,必须取2的N次方
/**
*CVS文件分隔符
*/
privatestaticfinalcharcvsSeparator='^';
publicstaticvoidmain(Stringargs[]){
longstart=System.currentTimeMillis();
try{
StringfileName="D:\\csvtest\\aa.csv";
FilesourceFile=newFile(fileName);
if(sourceFile.length()>=originFileSize){
StringcvsFileName=fileName.replaceAll("\\\\","/");
FileSplitUtilfileSplitUtil=newFileSplitUtil();
List<String>parts=fileSplitUtil.splitBySize(cvsFileName,blockFileSize);
for(Stringpart:parts){
System.out.println("partNameis:"+part);
}
}
System.out.println("总文件长度"+sourceFile.length()+",拆分文件耗时:"+(System.currentTimeMillis()-start)+"ms.");
}catch(Exceptione){
log.info(e.getStackTrace());
}
}
/**
*拆分文件
*
*@paramfileName待拆分的完整文件名
*@parambyteSize按多少字节大小拆分
*@return拆分后的文件名列表
*/
publicList<String>splitBySize(StringfileName,intbyteSize)
throwsIOException,InterruptedException{
List<String>parts=newArrayList<String>();
Filefile=newFile(fileName);
intcount=(int)Math.ceil(file.length()/(double)byteSize);
intcountLen=(count+"").length();
RandomAccessFileraf=newRandomAccessFile(fileName,"r");
longtotalLen=raf.length();
CountDownLatchlatch=newCountDownLatch(count);
for(inti=0;i<count;i++){
StringpartFileName=file.getPath()+"."
+leftPad((i+1)+"",countLen,'0')+".cvs";
intreadSize=byteSize;
longstartPos=(long)i*byteSize;
longnextPos=(long)(i+1)*byteSize;
if(nextPos>totalLen){
readSize=(int)(totalLen-startPos);
}
newSplitRunnable(readSize,startPos,partFileName,file,latch).run();
parts.add(partFileName);
}
latch.await();//等待所有文件写完
//由于切割时可能会导致行被切断,加工所有的的分割文件,合并行
mergeRow(parts);
returnparts;
}
/**
*分割处理Runnable
*
*@authorsupeidong
*/
privateclassSplitRunnableimplementsRunnable{
intbyteSize;
StringpartFileName;
FileoriginFile;
longstartPos;
CountDownLatchlatch;
publicSplitRunnable(intbyteSize,longstartPos,StringpartFileName,
FileoriginFile,CountDownLatchlatch){
this.startPos=startPos;
this.byteSize=byteSize;
this.partFileName=partFileName;
this.originFile=originFile;
this.latch=latch;
}
publicvoidrun(){
RandomAccessFilerFile;
OutputStreamos;
try{
rFile=newRandomAccessFile(originFile,"r");
byte[]b=newbyte[byteSize];
rFile.seek(startPos);//移动指针到每“段”开头
ints=rFile.read(b);
os=newFileOutputStream(partFileName);
os.write(b,0,s);
os.flush();
os.close();
latch.countDown();
}catch(IOExceptione){
log.error(e.getMessage());
latch.countDown();
}
}
}
/**
*合并被切断的行
*
*@paramparts
*/
privatevoidmergeRow(List<String>parts){
List<PartFile>partFiles=newArrayList<PartFile>();
try{
//组装被切分表对象
for(inti=0;i<parts.size();i++){
StringpartFileName=parts.get(i);
FilesplitFileTemp=newFile(partFileName);
if(splitFileTemp.exists()){
PartFilepartFile=newPartFile();
BufferedReaderreader=newBufferedReader(newInputStreamReader(newFileInputStream(splitFileTemp),"gbk"));
StringfirstRow=reader.readLine();
StringsecondRow=reader.readLine();
StringendRow=readLastLine(partFileName);
partFile.setPartFileName(partFileName);
partFile.setFirstRow(firstRow);
partFile.setEndRow(endRow);
if(i>=1){
StringprePartFile=parts.get(i-1);
StringpreEndRow=readLastLine(prePartFile);
partFile.setFirstIsFull(getCharCount(firstRow+preEndRow)>getCharCount(secondRow));
}
partFiles.add(partFile);
reader.close();
}
}
//进行需要合并的行的写入
for(inti=0;i<partFiles.size()-1;i++){
PartFilepartFile=partFiles.get(i);
PartFilepartFileNext=partFiles.get(i+1);
StringBuildersb=newStringBuilder();
if(partFileNext.getFirstIsFull()){
sb.append("\r\n");
sb.append(partFileNext.getFirstRow());
}else{
sb.append(partFileNext.getFirstRow());
}
writeLastLine(partFile.getPartFileName(),sb.toString());
}
}catch(Exceptione){
log.error(e.getMessage());
}
}
/**
*得到某个字符出现的次数
*@params
*@return
*/
privateintgetCharCount(Strings){
intcount=0;
for(inti=0;i<s.length();i++){
if(s.charAt(i)==cvsSeparator){
count++;
}
}
returncount;
}
/**
*采用BufferedInputStream方式读取文件行数
*
*@paramfilename
*@return
*/
publicintgetFileRow(Stringfilename)throwsIOException{
InputStreamis=newBufferedInputStream(newFileInputStream(filename));
byte[]c=newbyte[1024];
intcount=0;
intreadChars=0;
while((readChars=is.read(c))!=-1){
for(inti=0;i<readChars;++i){
if(c[i]=='\n')
++count;
}
}
is.close();
returncount;
}
/**
*读取最后一行数据
*@paramfilename
*@return
*@throwsIOException
*/
privateStringreadLastLine(Stringfilename)throwsIOException{
//使用RandomAccessFile,从后找最后一行数据
RandomAccessFileraf=newRandomAccessFile(filename,"r");
longlen=raf.length();
StringlastLine="";
if(len!=0L){
longpos=len-1;
while(pos>0){
pos--;
raf.seek(pos);
if(raf.readByte()=='\n'){
lastLine=raf.readLine();
lastLine=newString(lastLine.getBytes("8859_1"),"gbk");
break;
}
}
}
raf.close();
returnlastLine;
}
/**
*修改最后一行数据
*@paramfileName
*@paramlastString
*@return
*@throwsIOException
*/
privatevoidwriteLastLine(StringfileName,StringlastString){
try{
//打开一个随机访问文件流,按读写方式
RandomAccessFilerandomFile=newRandomAccessFile(fileName,"rw");
//文件长度,字节数
longfileLength=randomFile.length();
//将写文件指针移到文件尾。
randomFile.seek(fileLength);
//此处必须加gbk,否则会出现写入乱码
randomFile.write(lastString.getBytes("gbk"));
randomFile.close();
}catch(IOExceptione){
log.error(e.getMessage());
}
}
/**
*左填充
*
*@paramstr
*@paramlength
*@paramch
*@return
*/
publicstaticStringleftPad(Stringstr,intlength,charch){
if(str.length()>=length){
returnstr;
}
char[]chs=newchar[length];
Arrays.fill(chs,ch);
char[]src=str.toCharArray();
System.arraycopy(src,0,chs,length-src.length,src.length);
returnnewString(chs);
}
/**
*合并文件行内部类
*/
classPartFile{
privateStringpartFileName;
privateStringfirstRow;
privateStringendRow;
privatebooleanfirstIsFull;
publicStringgetPartFileName(){
returnpartFileName;
}
publicvoidsetPartFileName(StringpartFileName){
this.partFileName=partFileName;
}
publicStringgetFirstRow(){
returnfirstRow;
}
publicvoidsetFirstRow(StringfirstRow){
this.firstRow=firstRow;
}
publicStringgetEndRow(){
returnendRow;
}
publicvoidsetEndRow(StringendRow){
this.endRow=endRow;
}
publicbooleangetFirstIsFull(){
returnfirstIsFull;
}
publicvoidsetFirstIsFull(booleanfirstIsFull){
this.firstIsFull=firstIsFull;
}
}
}
以上就是本文的全部内容,希望对大家学习java程序设计有所帮助。