java线程池实现批量下载文件
本文实例为大家分享了java线程池实现批量下载文件的具体代码,供大家参考,具体内容如下
1创建线程池
packagecom.cheng.webb.thread;
importjava.util.concurrent.ArrayBlockingQueue;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.ThreadFactory;
importjava.util.concurrent.ThreadPoolExecutor;
importjava.util.concurrent.TimeUnit;
importjava.util.concurrent.atomic.AtomicInteger;
publicclassThreadUtil{
/**
*创建批量下载线程池
*
*@paramthreadSize下载线程数
*@returnExecutorService
*/
publicstaticExecutorServicebuildDownloadBatchThreadPool(intthreadSize){
intkeepAlive=0;
Stringprefix="download-batch";
ThreadFactoryfactory=ThreadUtil.buildThreadFactory(prefix);
returnnewThreadPoolExecutor(threadSize,
threadSize,
keepAlive,
TimeUnit.SECONDS,
newArrayBlockingQueue<>(threadSize),
factory);
}
/**
*创建自定义线程工厂
*
*@paramprefix名称前缀
*@returnThreadFactory
*/
publicstaticThreadFactorybuildThreadFactory(Stringprefix){
returnnewCustomThreadFactory(prefix);
}
/**
*自定义线程工厂
*/
publicstaticclassCustomThreadFactoryimplementsThreadFactory{
privateStringthreadNamePrefix;
privateAtomicIntegercounter=newAtomicInteger(1);
/**
*自定义线程工厂
*
*@paramthreadNamePrefix工厂名称前缀
*/
CustomThreadFactory(StringthreadNamePrefix){
this.threadNamePrefix=threadNamePrefix;
}
@Override
publicThreadnewThread(Runnabler){
StringthreadName=threadNamePrefix+"-t"+counter.getAndIncrement();
returnnewThread(r,threadName);
}
}
}
2批量下载文件
packagecom.cheng.webb.thread;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importjava.io.File;
importjava.io.FileOutputStream;
importjava.io.InputStream;
importjava.net.HttpURLConnection;
importjava.net.URL;
importjava.util.ArrayList;
importjava.util.List;
importjava.util.Map;
importjava.util.concurrent.*;
/**
*文件下载类
*
*@authorshucheng
*@creation2019年1月30日下午4:41:32
*/
publicclassDownloadUtil{
privatestaticLoggerlogger=LoggerFactory.getLogger(DownloadUtil.class);
/**
*下载线程数
*/
privatestaticfinalintDOWNLOAD_THREAD_NUM=14;
/**
*下载线程池
*/
privatestaticExecutorServicedownloadExecutorService=ThreadUtil
.buildDownloadBatchThreadPool(DOWNLOAD_THREAD_NUM);
/**
*文件下载
*
*@paramfileUrl
*文件url,如:https://img3.doubanio.com//view//photo//s_ratio_poster//public//p2369390663.webp
*@parampath
*存放路径,如:/opt/img/douban/my.webp
*/
publicstaticvoiddownload(StringfileUrl,Stringpath){
//判断存储文件夹是否已经存在或者创建成功
if(!createFolderIfNotExists(path)){
logger.error("Wecan'tcreatefolder:{}",getFolder(path));
return;
}
InputStreamin=null;
FileOutputStreamout=null;
try{
URLurl=newURL(fileUrl);
HttpURLConnectionconn=(HttpURLConnection)url.openConnection();
conn.setRequestMethod("GET");
//2s
conn.setConnectTimeout(10000);
in=conn.getInputStream();
out=newFileOutputStream(path);
intlen;
byte[]arr=newbyte[1024*1000];
while(-1!=(len=in.read(arr))){
out.write(arr,0,len);
}
out.flush();
conn.disconnect();
}catch(Exceptione){
logger.error("Failtodownload:{}by{}",fileUrl,e.getMessage());
}finally{
try{
if(null!=out){
out.close();
}
if(null!=in){
in.close();
}
}catch(Exceptione){
//donothing
}
}
}
/**
*创建文件夹,如果文件夹已经存在或者创建成功返回true
*
*@parampath
*路径
*@returnboolean
*/
privatestaticbooleancreateFolderIfNotExists(Stringpath){
StringfolderName=getFolder(path);
if(folderName.equals(path)){
returntrue;
}
Filefolder=newFile(getFolder(path));
if(!folder.exists()){
synchronized(DownloadUtil.class){
if(!folder.exists()){
returnfolder.mkdirs();
}
}
}
returntrue;
}
/**
*获取文件夹
*
*@parampath
*文件路径
*@returnString
*/
privatestaticStringgetFolder(Stringpath){
intindex=path.lastIndexOf("/");
return-1!=index?path.substring(0,index):path;
}
/**
*下载资源
*
*issue:线程池创建过多
*
*最大批量下载为5,请知悉
*
*@paramresourceMap
*资源map,key为资源下载url,value为资源存储位置
*/
publicstaticvoidbatch(MapresourceMap){
if(resourceMap==null||resourceMap.isEmpty()){
return;
}
try{
Listkeys=newArrayList<>(resourceMap.keySet());
intsize=keys.size();
intpageNum=getPageNum(size);
for(intindex=0;indexurlList=keys.subList(start,last);
for(Stringurl:urlList){
//提交任务
Runnabletask=newDownloadWorker(latch,url,resourceMap.get(url));
downloadExecutorService.submit(task);
}
latch.await();
}
}catch(Exceptione){
logger.error("{}",e);
}
logger.info("Downloadresourcemapisalldone");
}
/**
*获取最后一个元素
*
*@paramsize
*列表长度
*@paramindex
*下标
*@returnint
*/
privatestaticintgetLastNum(intsize,intindex){
returnindex>size?size:index;
}
/**
*获取划分页面数量
*
*@paramsize
*列表长度
*@returnint
*/
privatestaticintgetPageNum(intsize){
inttmp=size/DOWNLOAD_THREAD_NUM;
returnsize%DOWNLOAD_THREAD_NUM==0?tmp:tmp+1;
}
/**
*下载线程
*/
staticclassDownloadWorkerimplementsRunnable{
privateCountDownLatchlatch;
privateStringurl;
privateStringpath;
DownloadWorker(CountDownLatchlatch,Stringurl,Stringpath){
this.latch=latch;
this.url=url;
this.path=path;
}
@Override
publicvoidrun(){
logger.debug("Startbatch:[{}]into:[{}]",url,path);
DownloadUtil.download(url,path);
logger.debug("Download:[{}]into:[{}]isdone",url,path);
latch.countDown();
}
}
}
3测试批量下载文件
packagecom.cheng.webb.thread;
importjava.util.HashMap;
importjava.util.Map;
importorg.junit.Test;
importcom.alibaba.fastjson.JSON;
publicclassDownLoadTest{
Stringjson="{\r\n"
+"\"http://www.xxx.com/111/123.mp4\":\"myFile/111/123.mp4\",\r\n"
+"\"http://www.xxx.com/111/124.mp4\":\"myFile/111/124.mp4\",\r\n"
+"\"http://www.xxx.com/111/125.mp4\":\"myFile/111/125.mp4\"\r\n"
+"}";
@SuppressWarnings("unchecked")
@Test
publicvoidtest(){
Mapmap=newHashMap<>();
MapresMap=JSON.parseObject(json,map.getClass());
inttimes=1;
for(intindex=0;index
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。