基于Java实现多线程下载并允许断点续传
完整代码:https://github.com/iyuanyb/Downloader
多线程下载及断点续传的实现是使用HTTP/1.1引入的Range请求参数,可以访问Web资源的指定区间的内容。虽然实现了多线程及断点续传,但还有很多不完善的地方。
包含四个类:
Downloader:主类,负责分配任务给各个子线程,及检测进度DownloadFile:表示要下载的哪个文件,为了能写输入到文件的指定位置,使用RandomAccessFile类操作文件,多个线程写同一个文件需要保证线程安全,这里直接调用getChannel方法,获取一个文件通道,FileChannel是线程安全的。DownloadTask:实际执行下载的线程,获取[lowerBound,upperBound]区间的数据,当下载过程中出现异常时要通知其他线程(使用AtomicBoolean),结束下载Logger:实时记录下载进度,以便续传时知道从哪开始。感觉这里做的比较差,为了能实时写出日志及方便地使用Properties类的load/store方法格式化输入输出,每次都是打开后再关闭。
演示:
随便找一个文件下载:
强行结束程序并重新运行:
日志文件:
断点续传的关键是记录各个线程的下载进度,这里细节比较多,花了很久。只需要记录每个线程请求的Range区间极客,每次成功写数据到文件时,就更新一次下载区间。下面是下载完成后的日志内容。
代码:
Downloader.java
packagedownloader;
importjava.io.*;
importjava.net.*;
importjava.nio.file.Files;
importjava.nio.file.Path;
importjava.util.concurrent.atomic.AtomicBoolean;
publicclassDownloader{
privatestaticfinalintDEFAULT_THREAD_COUNT=4;//默认线程数量
privateAtomicBooleancanceled;//取消状态,如果有一个子线程出现异常,则取消整个下载任务
privateDownloadFilefile;//下载的文件对象
privateStringstorageLocation;
privatefinalintthreadCount;//线程数量
privatelongfileSize;//文件大小
privatefinalStringurl;
privatelongbeginTime;//开始时间
privateLoggerlogger;
publicDownloader(Stringurl){
this(url,DEFAULT_THREAD_COUNT);
}
publicDownloader(Stringurl,intthreadCount){
this.url=url;
this.threadCount=threadCount;
this.canceled=newAtomicBoolean(false);
this.storageLocation=url.substring(url.lastIndexOf('/')+1);
this.logger=newLogger(storageLocation+".log",url,threadCount);
}
publicvoidstart(){
booleanreStart=Files.exists(Path.of(storageLocation+".log"));
if(reStart){
logger=newLogger(storageLocation+".log");
System.out.printf("*继续上次下载进度[已下载:%.2fMB]:%s\n",logger.getWroteSize()/1014.0/1024,url);
}else{
System.out.println("*开始下载:"+url);
}
if(-1==(this.fileSize=getFileSize()))
return;
System.out.printf("*文件大小:%.2fMB\n",fileSize/1024.0/1024);
this.beginTime=System.currentTimeMillis();
try{
this.file=newDownloadFile(storageLocation,fileSize,logger);
if(reStart){
file.setWroteSize(logger.getWroteSize());
}
//分配线程下载
dispatcher(reStart);
//循环打印进度
printDownloadProgress();
}catch(IOExceptione){
System.err.println("x创建文件失败["+e.getMessage()+"]");
}
}
/**
*分配器,决定每个线程下载哪个区间的数据
*/
privatevoiddispatcher(booleanreStart){
longblockSize=fileSize/threadCount;//每个线程要下载的数据量
longlowerBound=0,upperBound=0;
long[][]bounds=null;
intthreadID=0;
if(reStart){
bounds=logger.getBounds();
}
for(inti=0;i
DownloadTask.java
packagedownloader;
importjava.io.*;
importjava.net.HttpURLConnection;
importjava.net.URL;
importjava.nio.ByteBuffer;
importjava.nio.channels.Channels;
importjava.nio.channels.ReadableByteChannel;
importjava.util.concurrent.atomic.AtomicBoolean;
classDownloadTaskextendsThread{
privatefinalStringurl;
privatelonglowerBound;//下载的文件区间
privatelongupperBound;
privateAtomicBooleancanceled;
privateDownloadFiledownloadFile;
privateintthreadId;
DownloadTask(Stringurl,longlowerBound,longupperBound,DownloadFiledownloadFile,
AtomicBooleancanceled,intthreadID){
this.url=url;
this.lowerBound=lowerBound;
this.upperBound=upperBound;
this.canceled=canceled;
this.downloadFile=downloadFile;
this.threadId=threadID;
}
@Override
publicvoidrun(){
ReadableByteChannelinput=null;
try{
ByteBufferbuffer=ByteBuffer.allocate(1024*1024*2);//2MB
input=connect();
System.out.println("*[线程"+threadId+"]连接成功,开始下载...");
intlen;
while(!canceled.get()&&lowerBound<=upperBound){
buffer.clear();
len=input.read(buffer);
downloadFile.write(lowerBound,buffer,threadId,upperBound);
lowerBound+=len;
}
if(!canceled.get()){
System.out.println("*[线程"+threadId+"]下载完成"+":"+lowerBound+"-"+upperBound);
}
}catch(IOExceptione){
canceled.set(true);
System.err.println("x[线程"+threadId+"]遇到错误["+e.getMessage()+"],结束下载");
}finally{
if(input!=null){
try{
input.close();
}catch(IOExceptione){
e.printStackTrace();
}
}
}
}
/**
*连接WEB服务器,并返回一个数据通道
*@return返回通道
*@throwsIOException网络连接错误
*/
privateReadableByteChannelconnect()throwsIOException{
HttpURLConnectionconn=(HttpURLConnection)newURL(url).openConnection();
conn.setConnectTimeout(3000);
conn.setRequestMethod("GET");
conn.setRequestProperty("Range","bytes="+lowerBound+"-"+upperBound);
//System.out.println("thread_"+threadId+":"+lowerBound+"-"+upperBound);
conn.connect();
intstatusCode=conn.getResponseCode();
if(HttpURLConnection.HTTP_PARTIAL!=statusCode){
conn.disconnect();
thrownewIOException("状态码错误:"+statusCode);
}
returnChannels.newChannel(conn.getInputStream());
}
}
DownloadFile.java
packagedownloader;
importjava.io.IOException;
importjava.io.RandomAccessFile;
importjava.nio.ByteBuffer;
importjava.nio.channels.FileChannel;
importjava.util.concurrent.atomic.AtomicLong;
classDownloadFile{
privatefinalRandomAccessFilefile;
privatefinalFileChannelchannel;//线程安全类
privateAtomicLongwroteSize;//已写入的长度
privateLoggerlogger;
DownloadFile(StringfileName,longfileSize,Loggerlogger)throwsIOException{
this.wroteSize=newAtomicLong(0);
this.logger=logger;
this.file=newRandomAccessFile(fileName,"rw");
file.setLength(fileSize);
channel=file.getChannel();
}
/**
*写数据
*@paramoffset写偏移量
*@parambuffer数据
*@throwsIOException写数据出现异常
*/
voidwrite(longoffset,ByteBufferbuffer,intthreadID,longupperBound)throwsIOException{
buffer.flip();
intlength=buffer.limit();
while(buffer.hasRemaining()){
channel.write(buffer,offset);
}
wroteSize.addAndGet(length);
logger.updateLog(threadID,length,offset+length,upperBound);//更新日志
}
/**
*@return已经下载的数据量,为了知道何时结束整个任务,以及统计信息
*/
longgetWroteSize(){
returnwroteSize.get();
}
//继续下载时调用
voidsetWroteSize(longwroteSize){
this.wroteSize.set(wroteSize);
}
voidclose(){
try{
file.close();
}catch(IOExceptione){
e.printStackTrace();
}
}
}
Logger.java
packagedownloader;
importjava.io.*;
importjava.util.Properties;
classLogger{
privateStringlogFileName;//下载的文件的名字
privatePropertieslog;
/**
*重新开始下载时,使用该构造函数
*@paramlogFileName
*/
Logger(StringlogFileName){
this.logFileName=logFileName;
log=newProperties();
FileInputStreamfin=null;
try{
log.load(newFileInputStream(logFileName));
}catch(IOExceptionignore){
}finally{
try{
fin.close();
}catch(Exceptionignore){}
}
}
Logger(StringlogFileName,Stringurl,intthreadCount){
this.logFileName=logFileName;
this.log=newProperties();
log.put("url",url);
log.put("wroteSize","0");
log.put("threadCount",String.valueOf(threadCount));
for(inti=0;i{
Stringkey=k.toString();
if(key.startsWith("thread_")){
String[]interval=v.toString().split("-");
bounds[index[0]][0]=Long.parseLong(key.substring(key.indexOf("_")+1));
bounds[index[0]][1]=Long.parseLong(interval[0]);
bounds[index[0]++][2]=Long.parseLong(interval[1]);
}
});
returnbounds;
}
longgetWroteSize(){
returnLong.parseLong(log.getProperty("wroteSize"));
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。