java多线程实现下载图片并压缩
最近在做一个需求:从其他系统的ftp目录下载存储图片url的文件,然后读取文件中的url地址,根据地址下载图片后按天压缩成一个包,平均一个地址文件中包含4000个地址左右,也就是说一个文件扫描后需要下载4000个左右的图片,然后压缩,下面把我的实现方式和优化过程记录下来,如果大家有什么更好的方式可以分享。
使用框架:SpringMVC
定时任务实现:继承org.springframework.scheduling.quartz.QuartzJobBean;
ftp环境搭建就不说了,在其他博客记录过,使用虚拟机中的CentOS搭建的FTP服务,创建FTP账号及对应目录,事先上传需要下载的图片地址文件。文件内容格式“图片ID||图片地址”。
方法一、最简单的实现方法就是先下载存储图片url地址的文件,然后读取文件遍历图片地址,调下载图片的方法将图片存储到本地,最后压缩下载的图片,完成后删除下载的图片,只保留压缩包。
publicclassPictureTransferJobextendsQuartzJobBean
{
protectedvoidexecuteInternal(JobExecutionContextarg0)throwsJobExecutionException
{
//实际的FTP配置是读取配置文件获取的
//FTP地址
StringhostName="192.168.1.112";
//FTP端口
intport=2001;
/FTP账号
StringuserName="test1";
//ftp密码
Stringpassword="test1";
//ftp文件存储目录
StringftpDowload="/";
//文件本地存储路径
Stringpath=this.getClass().getResource("/").getPath();
//图片地址文件存储目录
StringaddrPath=path.substring(1,path.indexOf("WEB-INF/classes"))+"picAddr";
//实际下载的图片存储目录
StringpicPath=path.substring(1,path.indexOf("WEB-INF/classes"))+"pic";
addrPath=addrPath.replace("%20","");
picPath=picPath.replace("%20","");
try
{
//创建存储图片地址的文件
creatFile(addrPath);
//创建存储实际图片的文件
creatFile(picPath);
StringoldAddrPath=addrPath;
StringoldPicPath=picPath;
//创建FTP连接
FtpUtil2ftpUtil2=newFtpUtil2(hostName,port,userName,password,ftpDowload,true);
//遍历FTP目录下的文件
String[]files=ftpUtil2.ListAllFiles();
//本地数据库会有一个表记录下载过的文件,这里会查询数据库和ftp列出的文件名比较,如果已经下载过的文件就不会下载,避免重复下载。
//下面省略比较的过程,循环files数组,在本地创建文件
for(inti=0;iaddrMap=newHashMap();
while((row=br.readLine())!=null)
{
try
{
count++;
if(count==1)
{
continue;
}
String[]column=row.split("\\|\\|",-1);
addrMap.put(column[0].trim(),column[1].trim());
}
catch(Exceptione)
{
e.printStackTrace();
}
}
System.out.println(newDate());
//这里调用压缩方法,压缩方法中会调用执行下载图片的方法
zipPic(picPath,synDate,addrMap);
System.out.println(newDate());
System.out.println("----------完成--------------");
returntrue;
}
catch(Exceptione)
{
e.printStackTrace();
//调用记录错误日志的业务类
returnfalse;
}finally{
try{
if(null!=br)
br.close();
}catch(IOExceptione){
e.printStackTrace();
}
}
}
/**
*根据url地址下载图片
*@throwsIOException
*/
privatebooleandownPic(StringpicPath,List>addrList,ListpicList)throwsIOException{
InputStreamis=null;
FileOutputStreamfos=null;
URLurl=null;
StringfileName=null;
StringpicAddr=null;
Filepic=null;
try
{
for(Map.EntryaddrEntry:addrList)
{
fileName=addrEntry.getKey();
picAddr=addrEntry.getValue();
//创建Url对象
url=newURL(picAddr);
is=url.openStream();
//URLConnection获取到的流通过InputStream直接写入字节数组会缺失数据,导致下载的图片不完整,使用org.apache.commons.io.IOUtils.toByteArray(urlconnection.openstream())可以解决
byte[]bytes=IOUtils.toByteArray(is);//newbyte[is.available()];获取的字节
//流中数据读入字节数组,读入后,流中数据清空
pic=newFile(picPath+fileName+".jpg");
fos=newFileOutputStream(pic);
fos.write(bytes);
//将下载的图片存入List,待图片全部下载完成后传入zip方法进行压缩
picList.add(pic);
fos.flush();
fos.close();
is.close();
}
returntrue;
}
catch(Exceptione)
{
e.printStackTrace();
returnfalse;
}
finally{
if(null!=fos)
{
fos.close();
}
if(null!=is)
{
is.close();
}
}
}
//这里是压缩文件的伪代码
privatevoidzipPic(picPath,synDate,addrMap);{
//传入需要压缩的文件列表和压缩文件名
ZipUtil.zipByStream(picList,newFile(picPath+synDate+".zip"));
}
/**
*创建文件
*@parampath
*/
privatevoidcreatFile(Stringpath)
{
Filefile=newFile(path);
if(!file.exists())
{
file.mkdirs();
}
}
}
方法二、多线程下载、直接压缩流
方法一虽然实现了基本功能,但是由于需要下载的图片太多,以及压缩本地图片文件和删除图片也比较耗时,所以可以优化速度的地方有两个。一个就是提高下载图片的效率,一个就是提高压缩的效率。
提高下载效率的方法可以使用多线程下载,提高压缩效率的方法是可以不将图片保存到本地而直接压缩文件流。
多线程实现方式:首先我们保存了需要下载的文件的地址列表,我们要使用多线程下载就要保证不同线程下载的图片不会重复,因此需要一个标志来区分,这时就可以使用一个索引计数器,按每个线程下载一定量图片分割,从0开始,每隔比如400个图片就用一个线程下载,这样就可以确定需要的线程个数,并且每个线程下载的图片不会重复。
压缩文件实现方式:因为生成压缩文件的本质也是读取需要压缩的文件流,然后生成压缩包,因此我们可以不创建下载的图片文件,而直接使用容器存储所有线程下载的图片流数据,然后将流数据传给压缩工具类直接压缩,这样就省略了读取图片文件创建流,然后生成压缩包,再删除本地图片文件的繁琐过程。
下面列出改造的主要实现:
/** *将下载的图片按天压缩 *@throwsIOException */ privatebooleanzipPic(StringpicPath,StringsynDate,MapaddrMap)throwsIOException{ //这里由于是多线程存储图片流,所以需要使用线程安全的map,因此使用ConcurrentHashMap Map pictureList=newConcurrentHashMap (); //这里定义每个线程下载的图片个数 intcount=400; //存储需要下载的图片地址 List >addrList=newArrayList >(addrMap.entrySet()); //线程数,加一是因为要创建一个线程下载最后不足400个的图片 intnThreads=(addrList.size()/count)+1; //CountDownLatchcountDownLatch=newCountDownLatch(nThreads); try { booleandownPic=false; //执行多线程下载图片 downPic=downPic(picPath,addrList,picList,pictureList,nThreads,count); if(downPic) { ZipUtil.zipByArray(picList,newFile(picPath+synDate+".zip")); } returntrue; } catch(Exceptione) { e.printStackTrace(); returnfalse; } }
下面是创建线程池
/** *根据url地址下载图片 *@throwsInterruptedException */ privatebooleandownPic(StringpicPath,List>addrList,Map picList,Map pictureList,intnThreads,intcount)throwsIOException,InterruptedException{ ExecutorServicethreadPool=Executors.newFixedThreadPool(nThreads); //创建两个个计数器 CountDownLatchbegin=newCountDownLatch(0); CountDownLatchend=newCountDownLatch(nThreads); //循环创建线程 for(inti=0;i >subAddrList=null; //计算每个线程执行的数据 if((i+1)==nThreads){ intstartIndex=(i*count); intendIndex=addrList.size(); subAddrList=addrList.subList(startIndex,endIndex); }else{ intstartIndex=(i*count); intendIndex=(i+1)*count; subAddrList=addrList.subList(startIndex,endIndex); } //线程类 PicDownloadmythead=newPicDownload(picPath,subAddrList,picList,pictureList); //这里执行线程的方式是调用线程池里的threadPool.execute(mythead)方法。 try { threadPool.execute(mythead); } catch(Exceptione) { //记录错误日志 returnfalse; } } begin.countDown(); end.await(); //执行完关闭线程池 threadPool.shutdown(); //这里一定要循环直到线程池中所有线程都结束才能往下走,测试时由于没有这一步导致子线程下载图片还没完成,而主线程已经往下走了,导致压缩包内没有图片 //也可以使用CountDownLatch实现 /*while(true) { if(threadPool.isTerminated()) { System.out.println("所有子线程已结束!"); break; } }*/ returntrue; }
下面是线程实现
classPicDownloadimplementsRunnable{
//下载图片的地址列表
List>addrList;
//装载下载成功的图片列表
MappicList;
MappictureList;
//图片本地存储路径
StringpicPath;
CountDownLatchbegin,end;
publicPicDownload(StringpicPath,List>addrList,MappicList,CountDownLatchbegin,CountDownLatchend){
this.addrList=addrList;
this.picList=picList;
this.picPath=picPath;
this.begin=begin;
this.end=end;
}
@Override
publicvoidrun()
{
try
{
System.out.println(Thread.currentThread().getName()+"------"+Thread.currentThread().getId());
downPicture(addrList);
//System.out.println(countDownLatch.getCount());
begin.await();
}
catch(Exceptione)
{
e.printStackTrace();
}finally{
end.countDown();
//countDownLatch.countDown();
}
}
publicbooleandownPicture(List>addrList)throwsException{
InputStreamis=null;
FileOutputStreamfos=null;
URLurl=null;
StringfileName=null;
StringpicAddr=null;
Filepic=null;
try
{
for(Map.EntryaddrEntry:addrList)
{
fileName=addrEntry.getKey();
picAddr=addrEntry.getValue();
//创建Url对象
url=newURL(picAddr);
is=url.openStream();
//URLConnection获取到的流通过InputStream直接写入字节数组会缺失数据,导致下载的图片不完整,使用org.apache.commons.io.IOUtils.toByteArray(urlconnection.openstream())可以解决
//byte[]bytes=IOUtils.toByteArray(is);//newbyte[is.available()];获取的字节
//流中数据读入字节数组,读入后,流中数据清空
picList.put(fileName+".jpg",is);
//这时候由于没有把流写入文件,一定不能关闭流,否则流中的数据就会丢失
//is.close();
}
returntrue;
}
catch(Exceptione)
{
e.printStackTrace();
returnfalse;
}
finally{
//不能关闭流
/*if(null!=is)
{
is.close();
}*/
}
}
}
上面使用流来压缩遇到了另一个问题,在压缩文件时会出现java.net.SocketException:Connectionreset
分析了一下原因,应该是由于流InputStream和UrlConnection是连接状态的,UrlConnection超时重置导致了获取输入流失败。
尝试设置URLConnection的超时时间,但是测试时发现图片下载收到网速影响较大,这种方式很不稳定,不可取,最后只有放弃使用流,而改用字节数组传给压缩工具类,然后将字节数组转为流压缩。
/** *使用容器存储下载的图片字节数组 */ publicbooleandownPicture(List>addrList)throwsException{ InputStreamis=null; FileOutputStreamfos=null; URLurl=null; StringfileName=null; StringpicAddr=null; Filepic=null; try { for(Map.Entry addrEntry:addrList) { fileName=addrEntry.getKey(); picAddr=addrEntry.getValue(); //创建Url对象 url=newURL(picAddr); //打开连接,创建java.net.URLConnection对象,该对象没有关闭连接的方法,可以转为它的子类HttpURLConnection调用disconnect方法关闭连接。 //java.net.URLConnection和java.net.HttpURLConnection都有设置超时时间的方法关闭连接 //HttpURLConnectionuc=(HttpURLConnection)url.openConnection(); is=uc.getInputStream(); //URLConnection获取到的流通过InputStream直接写入字节数组会缺失数据,导致下载的图片不完整,使用org.apache.commons.io.IOUtils.toByteArray(urlconnection.openstream())可以解决 byte[]bytes=IOUtils.toByteArray(is);//newbyte[is.available()];获取的字节 //流中数据读入字节数组,读入后,流中数据清空 //is.read(bytes); picList.put(fileName+".jpg",bytes); is.close(); } returntrue; } catch(Exceptione) { e.printStackTrace(); returnfalse; } finally{ if(null!=is) { is.close(); } } }
总结:
实现过程中遇到的问题:
1、使用线程池时对于共享状态,比如这里的存储下载的图片字节数据容器是所有线程共享的,因此需要使用同步的容器,否则会导致存储的数据出问题,因此使用了ConcurrentHashMap
2、这里存在一个主线程和子线程的执行顺序问题,因为主线程需要等待线程池中所有线程下载图片结束后才能往下走去压缩图片,如果主线程不等待子线程结束就向下执行压缩方法就会导致压缩图片缺少或者没有压缩图片。因此可以使用CountDownLatch实现,或者在关闭线程池语句下面使用死循环检查threadPool.isTerminated()才能继续执行主线程去压缩图片。
3、由于直接将UrlConnection获取到的输入流直接传给压缩类进行压缩,存在连接超时重置的情况,因此改用将下载的流存入字节数组,再传给压缩类压缩,避免使用流出现意外情况。
4、在使用urlconnection.openStream()获取输入流后,转换为字节数组下载的图片是不完整的。。使用org.apache.commons.io.IOUtils.toByteArray(urlconnection.openstream())可以解决,具体可以阅读其源码看实现。
下面是FTP工具类的实现:
importjava.io.BufferedInputStream;
importjava.io.BufferedOutputStream;
importjava.io.FileInputStream;
importjava.io.FileOutputStream;
importjava.io.IOException;
importorg.apache.commons.net.ftp.FTPClient;
importorg.apache.commons.net.ftp.FTPClientConfig;
importorg.apache.commons.net.ftp.FTPConnectionClosedException;
importorg.apache.commons.net.ftp.FTPReply;
publicclassFtpUtil2{
privateFTPClientftpClient=null;
//ftp服务器地址
privateStringhostName;
//ftp服务器默认端口
publicstaticintdefaultport=21;
//登录名
privateStringuserName;
//登录密码
privateStringpassword;
//需要访问的远程目录
privateStringremoteDir;
/**
*@paramhostName
*主机地址
*@paramport
*端口号
*@paramuserName
*用户名
*@parampassword
*密码
*@paramremoteDir
*默认工作目录
*@paramis_zhTimeZone
*是否是中文FTPServer端
*@return
*@return
*/
/**
*新增方法
*/
publicFtpUtil2()
{
PropConfigconfig=PropConfig.loadConfig("system.properties");
StringhostName=config.getConfig("ftpAddress");
Stringport=config.getConfig("ftpPort");
StringuserName=config.getConfig("ftpUserName");
Stringpassword=config.getConfig("ftpPassword");
StringremoteDir=config.getConfig("remoteFilePath");
booleanis_zhTimeZone=true;
this.hostName=hostName;
this.userName=userName;
this.password=password;
this.remoteDir=remoteDir==null?"":remoteDir;
this.ftpClient=newFTPClient();
if(is_zhTimeZone){
this.ftpClient.configure(FtpUtil2.Config());
this.ftpClient.setControlEncoding("GBK");
}
//登录
this.login();
//切换目录
this.changeDir(this.remoteDir);
this.setFileType(FTPClient.BINARY_FILE_TYPE);
ftpClient.setDefaultPort(Integer.parseInt(port));
}
publicFtpUtil2(StringhostName,intport,StringuserName,
Stringpassword,StringremoteDir,booleanis_zhTimeZone){
this.hostName=hostName;
this.userName=userName;
this.password=password;
defaultport=port;
this.remoteDir=remoteDir==null?"":remoteDir;
this.ftpClient=newFTPClient();
if(is_zhTimeZone){
this.ftpClient.configure(FtpUtil2.Config());
this.ftpClient.setControlEncoding("GBK");
}
//登录
this.login();
//切换目录
this.changeDir(this.remoteDir);
this.setFileType(FTPClient.ASCII_FILE_TYPE);
ftpClient.setDefaultPort(port);
}
/**
*登录FTP服务器
*/
publicbooleanlogin(){
booleansuccess=false;
try{
ftpClient.connect(this.hostName,defaultport);
ftpClient.login(this.userName,this.password);
intreply;
reply=ftpClient.getReplyCode();
if(!FTPReply.isPositiveCompletion(reply)){
ftpClient.disconnect();
returnsuccess;
}
}catch(FTPConnectionClosedExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}catch(IOExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}
success=true;
System.out.println("连接到ftp服务器:"+this.hostName+"成功..开始登录");
returnsuccess;
}
privatestaticFTPClientConfigConfig(){
FTPClientConfigconf=newFTPClientConfig(FTPClientConfig.SYST_UNIX);
conf.setRecentDateFormatStr("MM月dd日HH:mm");
//conf.setRecentDateFormatStr("(YYYY年)?MM月dd日(HH:mm)?");
returnconf;
}
/**
*变更工作目录
*
*@paramremoteDir
*
*/
publicvoidchangeDir(StringremoteDir){
try{
this.remoteDir=remoteDir;
ftpClient.changeWorkingDirectory(remoteDir);
}catch(IOExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}
System.out.println("变更工作目录为:"+remoteDir);
}
/**
*返回上一级目录(父目录)
*/
publicvoidtoParentDir(){
try{
ftpClient.changeToParentDirectory();
}catch(IOExceptione){
//TODOAuto-generatedcatchblock
e.printStackTrace();
}
}
/**
*列出当前工作目录下所有文件
*/
publicString[]ListAllFiles(){
String[]names=this.ListFiles("*");
returnthis.sort(names);
}
/**
*列出指定工作目录下的匹配文件
*
*@paramdir
*exp:/cim/
*@paramfile_regEx
*通配符为*
*/
publicString[]ListAllFiles(Stringdir,Stringfile_regEx){
String[]names=this.ListFiles(dir+file_regEx);
returnthis.sort(names);
}
/**
*列出匹配文件
*
*@paramfile_regEx
*匹配字符,通配符为*
*/
publicString[]ListFiles(Stringfile_regEx){
try{
/**
*FTPFile[]remoteFiles=ftpClient.listFiles(file_regEx);
*//System.out.println(remoteFiles.length);String[]name=new
*String[remoteFiles.length];if(remoteFiles!=null){for(int
*i=0;i
下面是ZIP工具类:
importjava.io.File;
importjava.io.FileInputStream;
importjava.io.FileOutputStream;
importjava.io.IOException;
importjava.io.InputStream;
importjava.io.OutputStream;
importjava.util.ArrayList;
importjava.util.Enumeration;
importjava.util.List;
importjava.util.Map;
importjava.util.zip.ZipEntry;
importjava.util.zip.ZipFile;
importjava.util.zip.ZipOutputStream;
importorg.apache.commons.io.IOUtils;
importcom.ibatis.common.logging.Log;
importcom.ibatis.common.logging.LogFactory;
publicclassZipUtil{
privatestaticfinalLoglog=LogFactory.getLog(ZipUtil.class);
/**
*压缩文件
*
*@paramsrcfileFile[]需要压缩的文件列表
*@paramzipfileFile压缩后的文件
*/
publicstaticOutputStreamzipFiles(Listsrcfile,OutputStreamoutputStream){
byte[]buf=newbyte[1024];
try{
//CreatetheZIPfile
ZipOutputStreamout=newZipOutputStream(outputStream);
//Compressthefiles
for(inti=0;i0){
//System.out.println(len+"==============");
out.write(buf,0,len);
}
//Completetheentry
out.closeEntry();
in.close();
}
//CompletetheZIPfile
out.close();
}catch(IOExceptione){
log.error("ZipUtilzipFilesexception:"+e);
}
returnoutputStream;
}
/**
*压缩文件
*
*@paramsrcfileFile[]需要压缩的文件列表
*@paramzipfileFile压缩后的文件
*/
publicstaticvoidzipFiles(Listsrcfile,Filezipfile){
byte[]buf=newbyte[1024];
try{
//CreatetheZIPfile
ZipOutputStreamout=newZipOutputStream(newFileOutputStream(zipfile));
//Compressthefiles
for(inti=0;i0){
out.write(buf,0,len);
}
//Completetheentry
out.closeEntry();
in.close();
}
//CompletetheZIPfile
out.close();
}catch(IOExceptione){
log.error("ZipUtilzipFilesexception:"+e);
}
}
/**
*压缩文件
*srcfile:key:文件名,value:文件对应的输入流
*@paramsrcfile
*@paramzipfile
*@see
*/
publicstaticvoidzipByStream(Mapsrcfile,Filezipfile){
try{
//CreatetheZIPfile
ZipOutputStreamout=newZipOutputStream(newFileOutputStream(zipfile));
//Compressthefiles
System.out.println(srcfile.entrySet().size());
for(Map.EntryfileEntry:srcfile.entrySet()){
InputStreamin=fileEntry.getValue();
//AddZIPentrytooutputstream.
System.out.println(in.available());
out.putNextEntry(newZipEntry(fileEntry.getKey()));
//TransferbytesfromthefiletotheZIPfile
byte[]bytes=IOUtils.toByteArray(in);
out.write(bytes);
out.closeEntry();
in.close();
}
//CompletetheZIPfile
out.close();
}catch(IOExceptione){
log.error("ZipUtilzipFilesexception:"+e);
System.out.println(e.getMessage());
}
}
/**
*压缩文件
*srcfile:key:文件名,value:文件对应的字节数组
*@paramsrcfile
*@paramzipfile
*@see
*/
publicstaticvoidzipByArray(Mapsrcfile,Filezipfile){
byte[]buf=newbyte[1024];
try{
//CreatetheZIPfile
ZipOutputStreamout=newZipOutputStream(newFileOutputStream(zipfile));
//Compressthefiles
System.out.println(srcfile.entrySet().size());
for(Map.EntryfileEntry:srcfile.entrySet()){
//InputStreamin=fileEntry.getValue();
//AddZIPentrytooutputstream.
out.putNextEntry(newZipEntry(fileEntry.getKey()));
//TransferbytesfromthefiletotheZIPfile
byte[]bytes=fileEntry.getValue();//IOUtils.toByteArray(in);
out.write(bytes);
out.closeEntry();
//in.close();
}
//CompletetheZIPfile
out.close();
}catch(IOExceptione){
log.error("ZipUtilzipFilesexception:"+e);
System.out.println(e.getMessage());
}
}
/**
*解压缩
*
*@paramzipfileFile需要解压缩的文件
*@paramdescDirString解压后的目标目录
*/
publicstaticvoidunZipFiles(Filezipfile,StringdescDir){
try{
//OpentheZIPfile
ZipFilezf=newZipFile(zipfile);
for(Enumerationentries=zf.entries();entries.hasMoreElements();){
//Gettheentryname
ZipEntryentry=((ZipEntry)entries.nextElement());
StringzipEntryName=entry.getName();
InputStreamin=zf.getInputStream(entry);
//System.out.println(zipEntryName);
OutputStreamout=newFileOutputStream(descDir+zipEntryName);
byte[]buf1=newbyte[1024];
intlen;
while((len=in.read(buf1))>0){
out.write(buf1,0,len);
}
//Closethefileandstream
in.close();
out.close();
}
}catch(IOExceptione){
log.error("ZipUtilunZipFilesexception:"+e);
}
}
/**
*Main
*
*@paramargs
*/
publicstaticvoidmain(String[]args){
Listsrcfile=newArrayList();
srcfile.add(newFile("d:\\1.jpg"));
srcfile.add(newFile("d:\\2.jpg"));
srcfile.add(newFile("d:\\3.jpg"));
srcfile.add(newFile("d:\\4.jpg"));
Filezipfile=newFile("d:\\pic.zip");
ZipUtil.zipFiles(srcfile,zipfile);
}
}
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。