Java中使用阻塞队列控制线程集实例
队列以一种先进先出的方式管理数据。如果你试图向一个已经满了的阻塞队列中添加一个元素,或是从一个空的阻塞队列中移除一个元素,将导致线程阻塞。在多线程进行合作时,阻塞队列是很有用的工具。工作者线程可以定期的把中间结果存到阻塞队列中。而其他工作者线程把中间结果取出并在将来修改它们。队列会自动平衡负载。如果第一个线程集运行的比第二个慢,则第二个线程集在等待结果时就会阻塞。如果第一个线程集运行的快,那么它将等待第二个线程集赶上来。
下面的程序展示了如何使用阻塞队列来控制线程集。程序在一个目录及它的所有子目录下搜索所有文件,打印出包含指定关键字的文件列表。
java.util.concurrent包提供了阻塞队列的4个变种:LinkedBlockingQueue、ArrayBlockingQueue、PriorityBlockingQueue和DelayQueue。我们用的是ArrayBlockingQueue。ArrayBlockingQueue在构造时需要给定容量,并可以选择是否需要公平性。如果公平参数被设置了,等待时间最长的线程会优先得到处理。通常,公平性会使你在性能上付出代价,只有在的确非常需要的时候再使用它。
生产者线程枚举在所有子目录下的所有文件并把它们放到一个阻塞队列中。这个操作很快,如果队列没有设上限的话,很快它就包含了没有找到的文件。
我们同时还启动了大量的搜索线程。每个搜索线程从队列中取出一个文件,打开它,打印出包含关键字的所有行,然后取出下一个文件。我们使用了一个小技巧来在工作结束后终止线程。为了发出完成信号,枚举线程把一个虚拟对象放入队列。(这类似于在行李输送带上放一个写着“最后一个包”的虚拟包。)当搜索线程取到这个虚拟对象时,就将其放回并终止。
注意,这里不需要人任何显示的线程同步。在这个程序中,我们使用队列数据结构作为一种同步机制。
importjava.io.*; importjava.util.*; importjava.util.concurrent.*;
publicclassBlockingQueueTest { publicstaticvoidmain(String[]args) { Scannerin=newScanner(System.in); System.out.print("Enterbasedirectory(e.g./usr/local/jdk1.6.0/src):"); Stringdirectory=in.nextLine(); System.out.print("Enterkeyword(e.g.volatile):"); Stringkeyword=in.nextLine();
finalintFILE_QUEUE_SIZE=10; finalintSEARCH_THREADS=100;
BlockingQueue<File>queue=newArrayBlockingQueue<File>(FILE_QUEUE_SIZE);
FileEnumerationTaskenumerator=newFileEnumerationTask(queue,newFile(directory)); newThread(enumerator).start(); for(inti=1;i<=SEARCH_THREADS;i++) newThread(newSearchTask(queue,keyword)).start(); } }
/** *Thistaskenumeratesallfilesinadirectoryanditssubdirectories. */ classFileEnumerationTaskimplementsRunnable { /** *ConstructsaFileEnumerationTask. *@paramqueuetheblockingqueuetowhichtheenumeratedfilesareadded *@paramstartingDirectorythedirectoryinwhichtostarttheenumeration */ publicFileEnumerationTask(BlockingQueue<File>queue,FilestartingDirectory) { this.queue=queue; this.startingDirectory=startingDirectory; }
publicvoidrun() { try { enumerate(startingDirectory); queue.put(DUMMY); } catch(InterruptedExceptione) { } }
/** *Recursivelyenumeratesallfilesinagivendirectoryanditssubdirectories *@paramdirectorythedirectoryinwhichtostart */ publicvoidenumerate(Filedirectory)throwsInterruptedException { File[]files=directory.listFiles(); for(Filefile:files) { if(file.isDirectory())enumerate(file); elsequeue.put(file); } }
publicstaticFileDUMMY=newFile("");
privateBlockingQueue<File>queue; privateFilestartingDirectory; }
/** *Thistasksearchesfilesforagivenkeyword. */ classSearchTaskimplementsRunnable { /** *ConstructsaSearchTask. *@paramqueuethequeuefromwhichtotakefiles *@paramkeywordthekeywordtolookfor */ publicSearchTask(BlockingQueue<File>queue,Stringkeyword) { this.queue=queue; this.keyword=keyword; }
publicvoidrun() { try { booleandone=false; while(!done) { Filefile=queue.take(); if(file==FileEnumerationTask.DUMMY) { queue.put(file); done=true; } elsesearch(file); } } catch(IOExceptione) { e.printStackTrace(); } catch(InterruptedExceptione) { } }
/** *Searchesafileforagivenkeywordandprintsallmatchinglines. *@paramfilethefiletosearch */ publicvoidsearch(Filefile)throwsIOException { Scannerin=newScanner(newFileInputStream(file)); intlineNumber=0; while(in.hasNextLine()) { lineNumber++; Stringline=in.nextLine().trim(); if(line.contains(keyword))System.out.printf("%s:%d %s%n",file.getPath(),lineNumber,line); } in.close(); }
privateBlockingQueue<File>queue; privateStringkeyword; }