Java多线程批量数据导入的方法详解
前言:
当遇到大量数据导入时,为了提高处理的速度,可以选择使用多线程来批量处理这些处理。常见的场景有:
- 大文件导入数据库(这个文件不一定是标准的CSV可导入文件或者需要在内存中经过一定的处理)
- 数据同步(从第三方接口拉取数据处理后写入自己的数据库)
以上的场景有一个共性,这类数据导入的场景简单来说就是将数据从一个数据源移动到另外一个数据源,而其中必定可以分为两步
- 数据读取:从数据源读取数据到内存
- 数据写入:将内存中的数据写入到另外一个数据源,可能存在数据处理
而且根据读取的速度一般会比数据写入的速度快很多,即读取快,写入慢。
设计思路
由于场景的特点是读取快,写入慢,如果是使用多线程处理,建议是数据写入部分改造为多线程。而数据读取可以改造成批量读取数据。简单来说就是两个要点:
- 批量读取数据
- 多线程写入数据
示例
多线程批量处理最简单的方案是使用线程池来进行处理,下面会通过一个模拟批量读取和写入的服务,以及对这个服务的多线程写入调用作为示例,展示如何多线程批量数据导入。
模拟服务
importjava.util.concurrent.atomic.AtomicLong;
/**
*数据批量写入用的模拟服务
*
*@authorRJH
*createat2019-04-01
*/
publicclassMockService{
/**
*可读取总数
*/
privatelongcanReadTotal;
/**
*写入总数
*/
privateAtomicLongwriteTotal=newAtomicLong(0);
/**
*写入休眠时间(单位:毫秒)
*/
privatefinallongsleepTime;
/**
*构造方法
*
*@paramcanReadTotal
*@paramsleepTime
*/
publicMockService(longcanReadTotal,longsleepTime){
this.canReadTotal=canReadTotal;
this.sleepTime=sleepTime;
}
/**
*批量读取数据接口
*
*@paramnum
*@return
*/
publicsynchronizedlongreadData(intnum){
longreadNum;
if(canReadTotal>=num){
canReadTotal-=num;
readNum=num;
}else{
readNum=canReadTotal;
canReadTotal=0;
}
//System.out.println("readdatasize:"+readNum);
returnreadNum;
}
/**
*写入数据接口
*/
publicvoidwriteData(){
try{
//休眠一定时间模拟写入速度慢
Thread.sleep(sleepTime);
}catch(InterruptedExceptione){
e.printStackTrace();
}
//写入总数自增
System.out.println("thread:"+Thread.currentThread()+"writedata:"+writeTotal.incrementAndGet());
}
/**
*获取写入的总数
*
*@return
*/
publiclonggetWriteTotal(){
returnwriteTotal.get();
}
}
批量数据处理器
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
/**
*基于线程池的多线程批量写入处理器
*@authorRJH
*createat2019-04-01
*/
publicclassSimpleBatchHandler{
privateExecutorServiceexecutorService;
privateMockServiceservice;
/**
*每次批量读取的数据量
*/
privateintbatch;
/**
*线程个数
*/
privateintthreadNum;
publicSimpleBatchHandler(MockServiceservice,intbatch,intthreadNum){
this.service=service;
this.batch=batch;
//使用固定数目的线程池
this.executorService=Executors.newFixedThreadPool(threadNum);
}
/**
*开始处理
*/
publicvoidstartHandle(){
//开始处理的时间
longstartTime=System.currentTimeMillis();
System.out.println("starthandletime:"+startTime);
longreadData;
while((readData=service.readData(batch))!=0){//批量读取数据,知道读取不到数据才停止
for(longi=0;iservice.writeData());
}
}
//关闭线程池
executorService.shutdown();
while(!executorService.isTerminated()){//等待线程池中的线程执行完
}
//结束时间
longendTime=System.currentTimeMillis();
System.out.println("endhandletime:"+endTime);
//总耗时
System.out.println("totalhandletime:"+(endTime-startTime)+"ms");
//写入总数
System.out.println("totalwritenum:"+service.getWriteTotal());
}
}
测试类
/**
*SimpleBatchHandler的测试类
*@authorRJH
*createat2019-04-01
*/
publicclassSimpleBatchHandlerTest{
publicstaticvoidmain(String[]args){
//总数
longtotal=100000;
//休眠时间
longsleepTime=100;
//每次拉取的数量
intbatch=100;
//线程个数
intthreadNum=16;
MockServicemockService=newMockService(total,sleepTime);
SimpleBatchHandlerhandler=newSimpleBatchHandler(mockService,batch,threadNum);
handler.startHandle();
}
}
运行结果
starthandletime:1554298681755 thread:Thread[pool-1-thread-2,5,main]writedata:1 thread:Thread[pool-1-thread-1,5,main]writedata:2 ...省略部分输出 thread:Thread[pool-1-thread-4,5,main]writedata:100000 endhandletime:1554299330202 totalhandletime:648447ms totalwritenum:100000
分析
在单线程情况下的执行时间应该为total*sleepTime,即10000000ms,而改造为多线程后执行时间为648447ms。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。