浅谈java.util.concurrent包中的线程池和消息队列
1.java并发包介绍
JDK5.0(JDK1.5更名后)以后的版本引入高级并发特性,大多数的特性在java.util.concurrent包中,是专门用于多线程编程的,充分利用了现代多处理器和多核心系统的功能以编写大规模并发应用程序。主要包括原子量、并发集合、同步器、可重入锁,并对线程池的构造提供了强力的支持
2.线程池
java.util.concurrent.Executors提供了一个java.util.concurrent.Executor接口的实现用于创建线程池
多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
假设服务器完成一项任务所需时间为:T1创建线程时间,T2在线程中执行任务的时间,T3销毁线程时间。如果T1+T3远大于T2,则可以采用线程池,以提高服务器性能,减少创建和销毁线程所需消耗的时间。
一个线程池由以下四个基本部分组成:
- 线程池管理器(ThreadPool):用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务;
- 工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
- 任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
- 任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
线程池技术正是关心如何缩短或调整T1,T3时间从而提高服务器程序性能的技术。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,免去了线程创建和销毁的开销。
线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:
假设一个服务器一天要处理100000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,
一般线程池大小是远小于100000。所以利用线程池的服务器程序不会为了创建100000而在处理请求时浪费时间,从而提高效率。
线程池的五种创建方式
- SingleThreadExecutor:只有一个线程的线程池,因此所提交的任务是顺序执行,Executors.newSingleThreadExecutor();
- CachedThreadPool:线程池里有很多线程需同时进行,旧的可用线程将被新的任务触发从而重新执行,如果线程超过60秒内没有执行,那么将被终止并从池中删除Executors.newCachedThreadPool();
- FixedThreadPool:拥有固定线程数的线程池,如果没有任务执行,那么线程会一直等待,Executors.newFixedThreadPool(10);在构造函数中的参数10是线程池的大小,你可以随意设置,也可以和cpu的数量保持一致,获取cpu的数量intcpuNums=Runtime.getRuntime().availableProcessors();
- ScheduledThreadPool:用来调度即将执行的任务的线程池Executors.newScheduledThreadPool();
- SingThreadScheduledPool:只有一个线程,用来调度任务在指定时间执行Executors.newSingleThreadScheduledExecutor();
3.线程池的使用
以下用FixedThreadPool作为示范,提供一个使用参考
LogNumVo
packagecom.ithzk.threadpool; /** *用作返回执行的数量的 *@authorhzk *@date2018/3/29 */ publicclassLogNumVo{ privatestaticfinallongserialVersionUID=-5541722936350755569L; privateIntegerdataNum; privateIntegersuccessNum; privateIntegerwaitNum; publicIntegergetDataNum(){ returndataNum; } publicvoidsetDataNum(IntegerdataNum){ this.dataNum=dataNum; } publicIntegergetSuccessNum(){ returnsuccessNum; } publicvoidsetSuccessNum(IntegersuccessNum){ this.successNum=successNum; } publicIntegergetWaitNum(){ returnwaitNum; } publicvoidsetWaitNum(IntegerwaitNum){ this.waitNum=waitNum; } }
DealObject
packagecom.ithzk.threadpool; /** *@authorhzk *@date2018/3/29 */ publicclassDealObject{ privateIntegeridentifyId; privateStringdata; publicDealObject(IntegeridentifyId,Stringdata){ this.identifyId=identifyId; this.data=data; } publicDealObject(){ } publicIntegergetIdentifyId(){ returnidentifyId; } publicvoidsetIdentifyId(IntegeridentifyId){ this.identifyId=identifyId; } publicStringgetData(){ returndata; } publicvoidsetData(Stringdata){ this.data=data; } }
AbstractCalculateThread
packagecom.ithzk.threadpool; importjava.util.Collection; importjava.util.concurrent.Callable; importjava.util.concurrent.CountDownLatch; /** *@authorhzk *@date2018/3/29 */ publicclassAbstractCalculateThreadimplementsCallable { protectedCollection insertList; protectedCountDownLatchcountd; protectedStringthreadCode; protectedStringbatchNumber; publicCollection getInsertList(){ returninsertList; } publicvoidsetInsertList(Collection insertList){ this.insertList=insertList; } publicCountDownLatchgetCountd(){ returncountd; } publicvoidsetCountd(CountDownLatchcountd){ this.countd=countd; } publicStringgetThreadCode(){ returnthreadCode; } publicvoidsetThreadCode(StringthreadCode){ this.threadCode=threadCode; } publicStringgetBatchNumber(){ returnbatchNumber; } publicvoidsetBatchNumber(StringbatchNumber){ this.batchNumber=batchNumber; } publicAbstractCalculateThread(){ super(); } publicAbstractCalculateThread(Collection insertList,CountDownLatchcountd,StringthreadCode,StringbatchNumber){ super(); this.insertList=insertList; this.countd=countd; this.threadCode=threadCode; this.batchNumber=batchNumber; } publicStringcall()throwsException{ returnnull; } }
CalculateDealThread
packagecom.ithzk.threadpool; importjava.util.Collection; importjava.util.concurrent.CountDownLatch; /** *@authorhzk *@date2018/3/29 */ publicclassCalculateDealThreadextendsAbstractCalculateThread{ privateExecutorPoolexecutorPool=SpringContextUtil.getBean(ExecutorPool.class); @Override publicStringcall()throwsException{ try{ System.out.println("========开始跑线程【"+threadCode+"】"); returnexecutorPool.syncBatchDealObject(insertList,batchNumber); }catch(Exceptione){ e.printStackTrace(); System.out.println("========开始跑线程【"+threadCode+"】:"+e.getMessage()); }finally{ countd.countDown(); } returnnull; } publicCalculateDealThread(){ super(); } publicCalculateDealThread(Collection insertList,CountDownLatchcountd,StringthreadCode,StringbatchNumber){ super(insertList,countd,threadCode,batchNumber); } }
ExecutorPool
packagecom.ithzk.threadpool; importjava.util.*; importjava.util.concurrent.*; /** *@authorhzk *@date2018/3/29 */ publicclassExecutorPool{ /** *模拟需要处理数据的大小 */ privatestaticfinalintARRAY_COUNT=50000; /** *开启多线程处理的条件 */ privatestaticfinalintMULTI_THREAD_STARTCOUNT=10000; /** *批量处理的大小 */ privatestaticfinalintBATCH_DEAL_SIZE=100; /** *每次开启线程数量 */ publicstaticfinalintTHREAD_POOL_NUM=10; publicstaticvoidmain(String[]args){ testExecutorPool(); } publicstaticvoidtestExecutorPool(){ ArrayListdealObjects=newArrayList (); for(inti=0;i MULTI_THREAD_STARTCOUNT){ try{ System.out.println("===================dataNum>1000|MultipleThreadRun======================="); //每次新增处理的条数 intbatchInsertSize=BATCH_DEAL_SIZE; //定义保存的线程池 ExecutorServiceexecutorInsert=Executors.newFixedThreadPool(THREAD_POOL_NUM); //定义保存过程中返回的线程执行返回参数 List >futureListIsert=newArrayList >(); //线程修改list List
4.BlockingQueue
BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。
插入:
add(anObject)
把anObject加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则抛出异常
offer(anObject)
把anObject加到BlockingQueue里,如果BlockingQueue可以容纳,则返回true,否则返回false.
put(anObject)
把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续.
读取:
poll(time)
取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null
take()
取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止
其他:
intremainingCapacity()
返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
该数量总是等于此队列的初始容量,小于队列的当前size(返回队列剩余的容量)。
注意,不能总是通过检查remainingcapacity来断定试图插入一个元素是否成功,因为可能是另一个线程将插入或移除某个元
素。
booleanremove(Objecto)
从队列移除元素,如果存在,即移除一个或者更多,队列改变了返回true
publicbooleancontains(Objecto)
查看队列是否存在这个元素,存在返回true
intdrainTo(Collectionc)
传入的集合中的元素,如果在队列中存在,那么将队列中的元素移动到集合中
intdrainTo(Collectionc,intmaxElements)
和上面方法的区别在于,制定了移动的数量
以下是一个BlockQueue的基本使用参考:
Producer
packagecom.ithzk.BlockingQueueTest; importjava.util.concurrent.BlockingQueue; /** *@authorhzk *@date2018/3/31 */ publicclassProducerimplementsRunnable{ BlockingQueueblockingQueue; publicProducer(BlockingQueue blockingQueue){ this.blockingQueue=blockingQueue; } @Override publicvoidrun(){ try{ StringthreadIdentify="AProducer,生产线程"+Thread.currentThread().getName(); blockingQueue.put(threadIdentify); System.out.println("Producesuccess!Thread:"+Thread.currentThread().getName()); }catch(InterruptedExceptione){ e.printStackTrace(); } } }
Consumer
packagecom.ithzk.BlockingQueueTest; importjava.util.concurrent.BlockingQueue; /** *@authorhzk *@date2018/3/31 */ publicclassConsumerimplementsRunnable{ BlockingQueueblockingQueue; publicConsumer(BlockingQueue blockingQueue){ this.blockingQueue=blockingQueue; } @Override publicvoidrun(){ try{ Stringconsumer=Thread.currentThread().getName(); System.out.println("CurrentConsumerThread:"+consumer); //如果队列为空会阻塞当前线程 Stringtake=blockingQueue.take(); System.out.println(consumer+"consumergetaproduct:"+take); }catch(InterruptedExceptione){ e.printStackTrace(); } } }
BlockTest
packagecom.ithzk.BlockingQueueTest; importjava.util.concurrent.BlockingQueue; importjava.util.concurrent.LinkedBlockingQueue; /** *@authorhzk *@date2018/3/31 */ publicclassBlockTest{ publicstaticvoidmain(String[]args)throwsInterruptedException{ //不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE //BlockingQueueblockingQueue=newLinkedBlockingQueue (); //BlockingQueue blockingQueue=newArrayBlockingQueue (2); BlockingQueue blockingQueue=newLinkedBlockingQueue (2); Consumerconsumer=newConsumer(blockingQueue); Producerproducer=newProducer(blockingQueue); for(inti=0;i<3;i++){ newThread(producer,"Producer"+(i+1)).start(); } for(inti=0;i<5;i++){ newThread(consumer,"Consumer"+(i+1)).start(); } Thread.sleep(5000); newThread(producer,"Producer"+(5)).start(); } }
BlockingQueue有四个具体的实现类,常用的两种实现类为:
- ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。
- LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制。
若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。
LinkedBlockingQueue可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。
LinkedBlockingQueue和ArrayBlockingQueue区别
LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。