Java线程池的几种实现方法和区别介绍实例详解
下面通过实例代码为大家介绍Java线程池的几种实现方法和区别:
importjava.text.DateFormat; importjava.text.SimpleDateFormat; importjava.util.ArrayList; importjava.util.Date; importjava.util.List; importjava.util.Random; importjava.util.concurrent.Callable; importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; importjava.util.concurrent.Future; publicclassTestThreadPool{ //-newFixedThreadPool与cacheThreadPool差不多,也是能reuse就用,但不能随时建新的线程 //-其独特之处:任意时间点,最多只能有固定数目的活动线程存在,此时如果有新的线程要建立,只能放在另外的队列中等待,直到当前的线程中某个线程终止直接被移出池子 //-和cacheThreadPool不同,FixedThreadPool没有IDLE机制(可能也有,但既然文档没提,肯定非常长,类似依赖上层的TCP或UDP //IDLE机制之类的),所以FixedThreadPool多数针对一些很稳定很固定的正规并发线程,多用于服务器 //-从方法的源代码看,cache池和fixed池调用的是同一个底层池,只不过参数不同: //fixed池线程数固定,并且是0秒IDLE(无IDLE) //cache池线程数支持0-Integer.MAX_VALUE(显然完全没考虑主机的资源承受能力),60秒IDLE privatestaticExecutorServicefixedService=Executors.newFixedThreadPool(6); //-缓存型池子,先查看池中有没有以前建立的线程,如果有,就reuse.如果没有,就建一个新的线程加入池中 //-缓存型池子通常用于执行一些生存期很短的异步型任务 //因此在一些面向连接的daemon型SERVER中用得不多。 //-能reuse的线程,必须是timeoutIDLE内的池中线程,缺省timeout是60s,超过这个IDLE时长,线程实例将被终止及移出池。 //注意,放入CachedThreadPool的线程不必担心其结束,超过TIMEOUT不活动,其会自动被终止。 privatestaticExecutorServicecacheService=Executors.newCachedThreadPool(); //-单例线程,任意时间池中只能有一个线程 //-用的是和cache池和fixed池相同的底层池,但线程数目是1-1,0秒IDLE(无IDLE) privatestaticExecutorServicesingleService=Executors.newSingleThreadExecutor(); //-调度型线程池 //-这个池子里的线程可以按schedule依次delay执行,或周期执行 privatestaticExecutorServicescheduledService=Executors.newScheduledThreadPool(10); publicstaticvoidmain(String[]args){ DateFormatformat=newSimpleDateFormat("yyyy-MM-ddHH:mm:ss"); ListcustomerList=newArrayList (); System.out.println(format.format(newDate())); testFixedThreadPool(fixedService,customerList); System.out.println("--------------------------"); testFixedThreadPool(fixedService,customerList); fixedService.shutdown(); System.out.println(fixedService.isShutdown()); System.out.println("----------------------------------------------------"); testCacheThreadPool(cacheService,customerList); System.out.println("----------------------------------------------------"); testCacheThreadPool(cacheService,customerList); cacheService.shutdownNow(); System.out.println("----------------------------------------------------"); testSingleServiceThreadPool(singleService,customerList); testSingleServiceThreadPool(singleService,customerList); singleService.shutdown(); System.out.println("----------------------------------------------------"); testScheduledServiceThreadPool(scheduledService,customerList); testScheduledServiceThreadPool(scheduledService,customerList); scheduledService.shutdown(); } publicstaticvoidtestScheduledServiceThreadPool(ExecutorServiceservice,List customerList){ List >listCallable=newArrayList >(); for(inti=0;i<10;i++){ Callable callable=newCallable (){ @Override publicIntegercall()throwsException{ returnnewRandom().nextInt(10); } }; listCallable.add(callable); } try{ List >listFuture=service.invokeAll(listCallable); for(Future future:listFuture){ Integerid=future.get(); customerList.add(id); } }catch(Exceptione){ e.printStackTrace(); } System.out.println(customerList.toString()); } publicstaticvoidtestSingleServiceThreadPool(ExecutorServiceservice,List customerList){ List >>listCallable=newArrayList >>(); for(inti=0;i<10;i++){ Callable >callable=newCallable
>(){ @Override publicList
call()throwsException{ List list=getList(newRandom().nextInt(10)); booleanisStop=false; while(list.size()>0&&!isStop){ System.out.println(Thread.currentThread().getId()+"--sleep:1000"); isStop=true; } returnlist; } }; listCallable.add(callable); } try{ List >>listFuture=service.invokeAll(listCallable); for(Future >future:listFuture){ List
list=future.get(); customerList.addAll(list); } }catch(Exceptione){ e.printStackTrace(); } System.out.println(customerList.toString()); } publicstaticvoidtestCacheThreadPool(ExecutorServiceservice,List customerList){ List >>listCallable=newArrayList >>(); for(inti=0;i<10;i++){ Callable >callable=newCallable
>(){ @Override publicList
call()throwsException{ List list=getList(newRandom().nextInt(10)); booleanisStop=false; while(list.size()>0&&!isStop){ System.out.println(Thread.currentThread().getId()+"--sleep:1000"); isStop=true; } returnlist; } }; listCallable.add(callable); } try{ List >>listFuture=service.invokeAll(listCallable); for(Future >future:listFuture){ List
list=future.get(); customerList.addAll(list); } }catch(Exceptione){ e.printStackTrace(); } System.out.println(customerList.toString()); } publicstaticvoidtestFixedThreadPool(ExecutorServiceservice,List customerList){ List >>listCallable=newArrayList >>(); for(inti=0;i<10;i++){ Callable >callable=newCallable
>(){ @Override publicList
call()throwsException{ List list=getList(newRandom().nextInt(10)); booleanisStop=false; while(list.size()>0&&!isStop){ System.out.println(Thread.currentThread().getId()+"--sleep:1000"); isStop=true; } returnlist; } }; listCallable.add(callable); } try{ List >>listFuture=service.invokeAll(listCallable); for(Future >future:listFuture){ List
list=future.get(); customerList.addAll(list); } }catch(Exceptione){ e.printStackTrace(); } System.out.println(customerList.toString()); } publicstaticList getList(intx){ List list=newArrayList (); list.add(x); list.add(x*x); returnlist; } }
使用:LinkedBlockingQueue实现线程池讲解
//例如:corePoolSize=3,maximumPoolSize=6,LinkedBlockingQueue(10) //RejectedExecutionHandler默认处理方式是:ThreadPoolExecutor.AbortPolicy //ThreadPoolExecutorexecutorService=newThreadPoolExecutor(corePoolSize,maximumPoolSize,1L,TimeUnit.SECONDS,newLinkedBlockingQueue(10)); //1.如果线程池中(也就是调用executorService.execute)运行的线程未达到LinkedBlockingQueue.init(10)的话,当前执行的线程数是:corePoolSize(3) //2.如果超过了LinkedBlockingQueue.init(10)并且超过的数>=init(10)+corePoolSize(3)的话,并且小于init(10)+maximumPoolSize.当前启动的线程数是:(当前线程数-init(10)) //3.如果调用的线程数超过了init(10)+maximumPoolSize则根据RejectedExecutionHandler的规则处理。
关于:RejectedExecutionHandler几种默认实现讲解
//默认使用:ThreadPoolExecutor.AbortPolicy,处理程序遭到拒绝将抛出运行时RejectedExecutionException。 RejectedExecutionHandlerpolicy=newThreadPoolExecutor.AbortPolicy(); ////在ThreadPoolExecutor.CallerRunsPolicy中,线程调用运行该任务的execute本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度。 //policy=newThreadPoolExecutor.CallerRunsPolicy(); ////在ThreadPoolExecutor.DiscardPolicy中,不能执行的任务将被删除。 //policy=newThreadPoolExecutor.DiscardPolicy(); ////在ThreadPoolExecutor.DiscardOldestPolicy中,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。 //policy=newThreadPoolExecutor.DiscardOldestPolicy();
希望本篇文章对您有所帮助