Java如何关闭或关闭BlockingQueue?
在此示例中,您将学习如何BlockingQueue在队列中没有其他可用元素时关闭或关闭。我们将通过使生产者在“生产者-消费者”场景中发送标记对象来使用通用策略。此标记对象也称为有毒对象,将被视为队列中不再包含需要处理的对象的标志。这将使我们能够中断使用者线程的操作。
package org.nhooo.example.util.concurrent; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class BlockingQueueShutdown { public static void main(String[] args) { BlockingQueue<String> queue = new ArrayBlockingQueue<>(32); MyDataProducer producer = new MyDataProducer(queue); MyDataConsumer consumer = new MyDataConsumer(queue); new Thread(producer).start(); new Thread(consumer).start(); } }
下面是Producer将数据放入队列的对象。字符串DONE是我们的标记对象。这是最后一个数据将被放置在队列中,以供消费者提取。
package org.nhooo.example.util.concurrent; import java.util.concurrent.BlockingQueue; public class MyDataProducer implements Runnable { BlockingQueue<String> queue; public MyDataProducer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { System.out.println("MyDataProducer.run"); String[] data = {"D001", "D002", "D003", "D004", "D005", "DONE"}; try { for (String element : data) { queue.put(element); Thread.sleep(1000); } } catch (InterruptedException e) { e.printStackTrace(); } } }
所述Consumer对象循环以从队列中检索的元素。当它从队列中检索标记对象时,它将中断循环并结束线程。
package org.nhooo.example.util.concurrent; import java.util.concurrent.BlockingQueue; public class MyDataConsumer implements Runnable { BlockingQueue<String> queue; public MyDataConsumer(BlockingQueue<String> queue) { this.queue = queue; } @Override public void run() { System.out.println("MyDataConsumer.run"); while (true) { try { String element = queue.take(); if ("DONE".equals(element)) { System.out.println("Exiting consumer thread, " + "end of data reached."); break; } System.out.println("Element = " + element); } catch (InterruptedException e) { e.printStackTrace(); } } } }