介绍
阻塞队列 (BlockingQueue) 是指当队列满时, 队列会阻塞插入元素的线程, 直到队列不满; 当队列空时, 队列会阻塞获得元素的线程, 直到队列变非空. 阻塞队列就是生产者用来存放元素, 消费者用来获取元素的容器.
当线程 插入 / 获取 动作由于队列 满 / 空 阻塞后, 队列也提供了一些机制去处理, 或抛出异常, 或返回特殊值, 或者线程一直等待...
方法 / 处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e, timeout, unit) |
移除方法 | remove(o) | poll() | take() | poll(timeout, unit) |
检查方法 | element() | peek() — 不移除元素 | 不可用 | 不可用 |
tips: 如果是无界阻塞队列, 则 put 方法永远不会被阻塞; offer 方法始终返回 true.
Java 中的阻塞队列:
ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的有界阻塞队列. 此队列按照先进先出 (FIFO) 的原则对元素进行排序, 默认情况下不保证线程公平的访问.
通过可重入的独占锁 ReentrantLock 来控制并发, Condition 来实现阻塞.
- public class ArrayBlockingQueueTest {
- /**
- * 1. 由于是有界阻塞队列, 需要设置初始大小
- * 2. 默认不保证阻塞线程的公平访问, 可设置公平性
- */
- private static ArrayBlockingQueue<String> QUEUE = new ArrayBlockingQueue<>(2, true);
- public static void main(String[] args) throws InterruptedException {
- Thread put = new Thread(() -> {
- // 3. 尝试插入元素
- try {
- QUEUE.put("java");
- QUEUE.put("javaScript");
- // 4. 元素已满, 会阻塞线程
- QUEUE.put("c++");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- put.start();
- Thread take = new Thread(() -> {
- try {
- // 5. 获取一个元素
- System.out.println(QUEUE.take());
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- });
- take.start();
- // 6 JavaScript,c++
- System.out.println(QUEUE.take());
- System.out.println(QUEUE.take());
- }
- }
- LinkedBlockingQueue
LinkedBlockingQueue 是一个用单向链表实现的有界阻塞队列. 此队列的默认和最大长度为 Integer.MAX_VALUE. 此队列按照先进先出的原则对元素进行排序.
和 ArrayBlockingQueue 一样, 采用 ReentrantLock 来控制并发, 不同的是它使用了两个独占锁来控制消费和生产, 通过 takeLock 和 putLock 两个锁来控制生产和消费, 互不干扰, 只要队列未满, 生产线程可以一直生产; 只要队列不空, 消费线程可以一直消费, 不会相互因为独占锁而阻塞.
tips: 因为使用了双锁, 避免并发计算不准确, 使用了一个 AtomicInteger 变量统计元素总量.
LinkedBlockingDeque
LinkedBlockingDeque 是一个由双向链表结构组成的有界阻塞队列, 可以从队列的两端插入和移出元素. 它实现了 BlockingDeque 接口, 多了 addFirst,addLast,offerFirst,offerLast,peekFirst 和 peekLast 等方法, 以 First 单词结尾的方法, 表示插入, 获取或移除双端队列的第一个元素. 以 Last 单词结尾的方法, 表示插入, 获取或移除双端队列的最后一个元素.
LinkedBlockingDeque 的 Node 实现多了指向前一个节点的变量 prev, 以此实现双向队列. 并发控制上和 ArrayBlockingQueue 类似, 采用单个 ReentrantLock 来控制并发. 因为双端队列头尾都可以消费和生产, 所以使用了一个共享锁.
双向阻塞队列可以运用在 "工作窃取" 模式中.
- public class LinkedBlockingDequeTest {
- private static LinkedBlockingDeque<String> DEQUE = new LinkedBlockingDeque<>(2);
- public static void main(String[] args) {
- DEQUE.addFirst("java");
- DEQUE.addFirst("c++");
- // java
- System.out.println(DEQUE.peekLast());
- // java
- System.out.println(DEQUE.pollLast());
- DEQUE.addLast("php");
- // c++
- System.out.println(DEQUE.pollFirst());
- }
- }
tips: take() 方法调用的是 takeFirst(), 使用时候需注意.
PriorityBlockingQueue
PriorityBlockingQueue 是一个底层由数组实现的无界阻塞队列, 并带有排序功能. 由于是无界队列, 所以插入永远不会被阻塞. 默认情况下元素采取自然顺序升序排列. 也可以自定义类实现 compareTo()方法来指定元素排序规则, 或者初始化 PriorityBlockingQueue 时, 指定构造参数 Comparator 来对元素进行排序.
底层同样采用 ReentrantLock 来控制并发, 由于只有获取会阻塞, 所以只采用一个 Condition(只通知消费)来实现.
- public class PriorityBlockingQueueTest {
- private static PriorityBlockingQueue<String> QUEUE = new PriorityBlockingQueue<>();
- public static void main(String[] args) {
- QUEUE.add("java");
- QUEUE.add("javaScript");
- QUEUE.add("c++");
- QUEUE.add("python");
- QUEUE.add("php");
- Iterator<String> it = QUEUE.iterator();
- while (it.hasNext()) {
- // c++ JavaScript java python PHP
- // 同优先级不保证排序顺序
- System.out.print(it.next() + " ");
- }
- }
- }
- DelayQueue
DelayQueue 是一个支持延时获取元素的无界阻塞队列. 队列使用 PriorityQueue 来实现. 队列中的元素必须实现 Delayed 接口, 元素按延迟优先级排序, 延迟时间短的排在前面, 只有在延迟期满时才能从队列中提取元素.
和 PriorityBlockingQueue 相似, 底层也是数组, 采用一个 ReentrantLock 来控制并发.
应用场景:
缓存系统的设计: 可以用 DelayQueue 保存缓存元素的有效期, 使用一个线程循环查询 DelayQueue, 一旦能从 DelayQueue 中获取元素时, 表示缓存有效期到了.
定时任务调度: 使用 DelayQueue 保存当天将会执行的任务和执行时间, 一旦从 DelayQueue 中获取到任务就开始执行, 比如 TimerQueue 就是使用 DelayQueue 实现的.
- public class DelayElement implements Delayed, Runnable {
- private static final AtomicLong SEQUENCER = new AtomicLong();
- /**
- * 标识元素先后顺序
- */
- private final long sequenceNumber;
- /**
- * 延迟时间, 单位纳秒
- */
- private long time;
- public DelayElement(long time) {
- this.time = System.nanoTime() + time;
- this.sequenceNumber = SEQUENCER.getAndIncrement();
- }
- @Override
- public long getDelay(TimeUnit unit) {
- return unit.convert(time - System.nanoTime(), NANOSECONDS);
- }
- @Override
- public int compareTo(Delayed other) {
- // compare zero if same object
- if (other == this) {
- return 0;
- }
- if (other instanceof DelayElement) {
- DelayElement x = (DelayElement) other;
- long diff = time - x.time;
- if (diff <0) {
- return -1;
- } else if (diff> 0) {
- return 1;
- } else if (sequenceNumber <x.sequenceNumber) {
- return -1;
- } else {
- return 1;
- }
- }
- long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
- return (diff < 0) ? -1 : (diff> 0) ? 1 : 0;
- }
- @Override
- public void run() {
- System.out.println("sequenceNumber" + sequenceNumber);
- }
- @Override
- public String toString() {
- return "DelayElement{" + "sequenceNumber=" + sequenceNumber + ", time=" + time + '}';
- }
- }
- public class DelayQueueTest {
- private static DelayQueue<DelayElement> QUEUE = new DelayQueue<>();
- public static void main(String[] args) {
- // 1. 添加 10 个参数
- for (int i = 1; i <10; i++) {
- // 2. 5 秒内随机延迟
- int nextInt = new Random().nextInt(5);
- long convert = TimeUnit.NANOSECONDS.convert(nextInt, TimeUnit.SECONDS);
- QUEUE.offer(new DelayElement(convert));
- }
- // 3. 查询元素排序 -- 延迟短的排在前面
- Iterator<DelayElement> iterator = QUEUE.iterator();
- while (iterator.hasNext()) {
- System.out.println(iterator.next());
- }
- // 4. 可观察到元素延迟输出
- while (!QUEUE.isEmpty()) {
- Thread thread = new Thread(QUEUE.poll());
- thread.start();
- }
- }
- }
- LinkedTransferQueue
LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列.
并发控制上采用了大量的 CAS 操作, 没有使用锁.
相对于其他阻塞队列, LinkedTransferQueue 多了 tryTransfer 和 transfer 方法.
transfer : Transfers the element to a consumer, waiting if necessary to do so. 存入的元素必须等到有消费者消费才返回.
tryTransfer:Transfers the element to a waiting consumer immediately, if possible. 如果有消费者正在等待消费元素, 则把传入的元素传给消费者. 否则立即返回 false, 不用等到消费.
SynchronousQueue
SynchronousQueue 是一个不存储元素的阻塞队列. 每一个 put 操作必须等待一个 take 操作, 否则继续 put 操作会被阻塞.
SynchronousQueue 默认情况下线程采用非公平性策略访问队列, 未使用锁, 全部通过 CAS 操作来实现并发, 吞吐量非常高, 高于 LinkedBlockingQueue 和 ArrayBlockingQueue, 非常适合用来处理一些高效的传递性场景. Executors.newCachedThreadPool() 就使用了 SynchronousQueue 进行任务传递.
- public class SynchronousQueueTest {
- private static class SynchronousQueueProducer implements Runnable {
- private BlockingQueue<String> blockingQueue;
- private SynchronousQueueProducer(BlockingQueue<String> queue) {
- this.blockingQueue = queue;
- }
- @Override
- public void run() {
- while (true) {
- try {
- String data = UUID.randomUUID().toString();
- System.out.println(Thread.currentThread().getName() + "Put:" + data);
- blockingQueue.put(data);
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- private static class SynchronousQueueConsumer implements Runnable {
- private BlockingQueue<String> blockingQueue;
- private SynchronousQueueConsumer(BlockingQueue<String> queue) {
- this.blockingQueue = queue;
- }
- @Override
- public void run() {
- while (true) {
- try {
- System.out.println(Thread.currentThread().getName() + "take():" + blockingQueue.take());
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- public static void main(String[] args) {
- final BlockingQueue<String> synchronousQueue = new SynchronousQueue<>();
- SynchronousQueueProducer queueProducer = new SynchronousQueueProducer(synchronousQueue);
- new Thread(queueProducer, "producer - 1").start();
- SynchronousQueueConsumer queueConsumer1 = new SynchronousQueueConsumer(synchronousQueue);
- new Thread(queueConsumer1, "consumer - 1").start();
- SynchronousQueueConsumer queueConsumer2 = new SynchronousQueueConsumer(synchronousQueue);
- new Thread(queueConsumer2, "consumer - 2").start();
- }
- }
参考书籍:《Java 并发编程的艺术》
参考博文: https://www.cnblogs.com/konck/p/9473677.html
来源: https://www.cnblogs.com/jmcui/p/11442616.html