基于Java实现多线程下载并允许断点续传
完整代码:https://github.com/iyuanyb/Downloader
多线程下载及断点续传的实现是使用HTTP/1.1引入的Range请求参数,可以访问Web资源的指定区间的内容。虽然实现了多线程及断点续传,但还有很多不完善的地方。
包含四个类:
Downloader:主类,负责分配任务给各个子线程,及检测进度DownloadFile:表示要下载的哪个文件,为了能写输入到文件的指定位置,使用RandomAccessFile类操作文件,多个线程写同一个文件需要保证线程安全,这里直接调用getChannel方法,获取一个文件通道,FileChannel是线程安全的。DownloadTask:实际执行下载的线程,获取[lowerBound,upperBound]区间的数据,当下载过程中出现异常时要通知其他线程(使用AtomicBoolean),结束下载Logger:实时记录下载进度,以便续传时知道从哪开始。感觉这里做的比较差,为了能实时写出日志及方便地使用Properties类的load/store方法格式化输入输出,每次都是打开后再关闭。
演示:
随便找一个文件下载:
强行结束程序并重新运行:
日志文件:
断点续传的关键是记录各个线程的下载进度,这里细节比较多,花了很久。只需要记录每个线程请求的Range区间极客,每次成功写数据到文件时,就更新一次下载区间。下面是下载完成后的日志内容。
代码:
Downloader.java
packagedownloader; importjava.io.*; importjava.net.*; importjava.nio.file.Files; importjava.nio.file.Path; importjava.util.concurrent.atomic.AtomicBoolean; publicclassDownloader{ privatestaticfinalintDEFAULT_THREAD_COUNT=4;//默认线程数量 privateAtomicBooleancanceled;//取消状态,如果有一个子线程出现异常,则取消整个下载任务 privateDownloadFilefile;//下载的文件对象 privateStringstorageLocation; privatefinalintthreadCount;//线程数量 privatelongfileSize;//文件大小 privatefinalStringurl; privatelongbeginTime;//开始时间 privateLoggerlogger; publicDownloader(Stringurl){ this(url,DEFAULT_THREAD_COUNT); } publicDownloader(Stringurl,intthreadCount){ this.url=url; this.threadCount=threadCount; this.canceled=newAtomicBoolean(false); this.storageLocation=url.substring(url.lastIndexOf('/')+1); this.logger=newLogger(storageLocation+".log",url,threadCount); } publicvoidstart(){ booleanreStart=Files.exists(Path.of(storageLocation+".log")); if(reStart){ logger=newLogger(storageLocation+".log"); System.out.printf("*继续上次下载进度[已下载:%.2fMB]:%s\n",logger.getWroteSize()/1014.0/1024,url); }else{ System.out.println("*开始下载:"+url); } if(-1==(this.fileSize=getFileSize())) return; System.out.printf("*文件大小:%.2fMB\n",fileSize/1024.0/1024); this.beginTime=System.currentTimeMillis(); try{ this.file=newDownloadFile(storageLocation,fileSize,logger); if(reStart){ file.setWroteSize(logger.getWroteSize()); } //分配线程下载 dispatcher(reStart); //循环打印进度 printDownloadProgress(); }catch(IOExceptione){ System.err.println("x创建文件失败["+e.getMessage()+"]"); } } /** *分配器,决定每个线程下载哪个区间的数据 */ privatevoiddispatcher(booleanreStart){ longblockSize=fileSize/threadCount;//每个线程要下载的数据量 longlowerBound=0,upperBound=0; long[][]bounds=null; intthreadID=0; if(reStart){ bounds=logger.getBounds(); } for(inti=0;iDownloadTask.java
packagedownloader; importjava.io.*; importjava.net.HttpURLConnection; importjava.net.URL; importjava.nio.ByteBuffer; importjava.nio.channels.Channels; importjava.nio.channels.ReadableByteChannel; importjava.util.concurrent.atomic.AtomicBoolean; classDownloadTaskextendsThread{ privatefinalStringurl; privatelonglowerBound;//下载的文件区间 privatelongupperBound; privateAtomicBooleancanceled; privateDownloadFiledownloadFile; privateintthreadId; DownloadTask(Stringurl,longlowerBound,longupperBound,DownloadFiledownloadFile, AtomicBooleancanceled,intthreadID){ this.url=url; this.lowerBound=lowerBound; this.upperBound=upperBound; this.canceled=canceled; this.downloadFile=downloadFile; this.threadId=threadID; } @Override publicvoidrun(){ ReadableByteChannelinput=null; try{ ByteBufferbuffer=ByteBuffer.allocate(1024*1024*2);//2MB input=connect(); System.out.println("*[线程"+threadId+"]连接成功,开始下载..."); intlen; while(!canceled.get()&&lowerBound<=upperBound){ buffer.clear(); len=input.read(buffer); downloadFile.write(lowerBound,buffer,threadId,upperBound); lowerBound+=len; } if(!canceled.get()){ System.out.println("*[线程"+threadId+"]下载完成"+":"+lowerBound+"-"+upperBound); } }catch(IOExceptione){ canceled.set(true); System.err.println("x[线程"+threadId+"]遇到错误["+e.getMessage()+"],结束下载"); }finally{ if(input!=null){ try{ input.close(); }catch(IOExceptione){ e.printStackTrace(); } } } } /** *连接WEB服务器,并返回一个数据通道 *@return返回通道 *@throwsIOException网络连接错误 */ privateReadableByteChannelconnect()throwsIOException{ HttpURLConnectionconn=(HttpURLConnection)newURL(url).openConnection(); conn.setConnectTimeout(3000); conn.setRequestMethod("GET"); conn.setRequestProperty("Range","bytes="+lowerBound+"-"+upperBound); //System.out.println("thread_"+threadId+":"+lowerBound+"-"+upperBound); conn.connect(); intstatusCode=conn.getResponseCode(); if(HttpURLConnection.HTTP_PARTIAL!=statusCode){ conn.disconnect(); thrownewIOException("状态码错误:"+statusCode); } returnChannels.newChannel(conn.getInputStream()); } }DownloadFile.java
packagedownloader; importjava.io.IOException; importjava.io.RandomAccessFile; importjava.nio.ByteBuffer; importjava.nio.channels.FileChannel; importjava.util.concurrent.atomic.AtomicLong; classDownloadFile{ privatefinalRandomAccessFilefile; privatefinalFileChannelchannel;//线程安全类 privateAtomicLongwroteSize;//已写入的长度 privateLoggerlogger; DownloadFile(StringfileName,longfileSize,Loggerlogger)throwsIOException{ this.wroteSize=newAtomicLong(0); this.logger=logger; this.file=newRandomAccessFile(fileName,"rw"); file.setLength(fileSize); channel=file.getChannel(); } /** *写数据 *@paramoffset写偏移量 *@parambuffer数据 *@throwsIOException写数据出现异常 */ voidwrite(longoffset,ByteBufferbuffer,intthreadID,longupperBound)throwsIOException{ buffer.flip(); intlength=buffer.limit(); while(buffer.hasRemaining()){ channel.write(buffer,offset); } wroteSize.addAndGet(length); logger.updateLog(threadID,length,offset+length,upperBound);//更新日志 } /** *@return已经下载的数据量,为了知道何时结束整个任务,以及统计信息 */ longgetWroteSize(){ returnwroteSize.get(); } //继续下载时调用 voidsetWroteSize(longwroteSize){ this.wroteSize.set(wroteSize); } voidclose(){ try{ file.close(); }catch(IOExceptione){ e.printStackTrace(); } } }Logger.java
packagedownloader; importjava.io.*; importjava.util.Properties; classLogger{ privateStringlogFileName;//下载的文件的名字 privatePropertieslog; /** *重新开始下载时,使用该构造函数 *@paramlogFileName */ Logger(StringlogFileName){ this.logFileName=logFileName; log=newProperties(); FileInputStreamfin=null; try{ log.load(newFileInputStream(logFileName)); }catch(IOExceptionignore){ }finally{ try{ fin.close(); }catch(Exceptionignore){} } } Logger(StringlogFileName,Stringurl,intthreadCount){ this.logFileName=logFileName; this.log=newProperties(); log.put("url",url); log.put("wroteSize","0"); log.put("threadCount",String.valueOf(threadCount)); for(inti=0;i{ Stringkey=k.toString(); if(key.startsWith("thread_")){ String[]interval=v.toString().split("-"); bounds[index[0]][0]=Long.parseLong(key.substring(key.indexOf("_")+1)); bounds[index[0]][1]=Long.parseLong(interval[0]); bounds[index[0]++][2]=Long.parseLong(interval[1]); } }); returnbounds; } longgetWroteSize(){ returnLong.parseLong(log.getProperty("wroteSize")); } } 以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。