阻塞队列是一个支持两个附加操作的队列. 这两个附加的操作支持阻塞的插入和移除方法:
支持阻塞的插入方法: 队列满时, 队列会阻塞插入元素的线程, 直到队列不满
支持阻塞的移除方法: 队列空时, 获取元素的线程会等待队列变为非空
阻塞队列常用于生产者消费者的场景. 其中生产者是向队列添加元素的线程, 消费者是从队列取出元素的线程, 阻塞队列是存放和获取元素的容器.
阻塞队列的 4 种处理方式:
抛出异常:
add(e) 当队列满, 再插入元素, 抛出异常
remove() 当队列空, 再删除元素, 抛出异常
element() 获取元素
返回特殊值:
offer(e) 插入元素时, 插入成功返回 true
poll() 移除元素, 成功返回该值, 否则返回 null
peek()
一直阻塞:
put(e) 当阻塞队列满时, 再插入时会阻塞生产者线程, 直到队列可用或中断退出
take() 当队列空, 再移除元素会阻塞消费者线程, 直到队列不空
超时退出
offer(e, time, unit)
队列满时再插入元素, 阻塞, 超时退出
poll(time, unit) 队列空时移除元素, 阻塞, 超时退出
Java 中几种阻塞队列
ArrayBlockingQueue: 数组结构构成的有界 FIFO 阻塞队列
LinkedBolckingQueue: 链表结构构成的有界 FIFO 阻塞队列
PriorityBlockingQueue: 支持优先级排序的无界阻塞队列
DelayQueue: 支持延时获取元素, 使用优先级队列实现的无界阻塞队列
SynchronousQueue: 不存储元素的阻塞队列, 不为队列元素维护存储空间
LinkedTransferQueue: 链表结构构成的无界阻塞队列
LinkedBlockingDeque: 链表构成的双向阻塞队列
ArrayBlockingQueue
ArrayBlockingQueue 是一个用数组实现的有界的, 按照 FIFO 原则对元素排序的阻塞队列. 它还支持对等待的生产者和消费者线程进行排序时的可选公平策略, 默认情况下不保证线程公平的访问, 在构造时可以选择公平策略. 公平性会降低吞吐量, 但是减少了可变性和避免了 "不平衡性".
LinkedBlockingQueue
这是一个用链表实现的有界阻塞队列, 默认长度和最大长度都是 Integer.MAX_VALUE . 该队列也是按照 FIFO 原则对元素排序, 确定线程执行的先后顺序.
PriorityBlockingQueue
这是一个支持优先级的无界祖苏队列, 默认情况下采取自然顺序升序排序, 也可以通过构造函数指定 Comparator 来对元素进行排序. 但是它不能保证相同优先级元素的顺序.
底层是采用二叉最大堆来实现优先级排序的.
DelayQueue
这是一个支持延时获取元素的无界阻塞队列, 其队列使用优先队列 PriorityQueue 实现. 队列中的元素必须实现 Delayed 接口, 创建元素时可以指定多久之后才能从队列中获取该元素, 只有在元素到期时才能获取.
主要用于缓存, 如清除缓冲中超时的数据. 还用于定时任务的调度.
元素创建时, 要实现 Delayed 接口, 首先进行初始化; 然后实现 getDelay(Timeunit unit) 方法, 返回的值是当前元素还需要延时多长时间; 最后实现 compareTo(Delayed other) 方法, 用来指定元素的顺序.
当消费者从队列中获取元素时, 如果元素还没有到延时时间, 就阻塞当前线程. 此外, 设置了 leader 变量表示等待获取队列头部元素的线程. 如果 leader 不为空, 表示有现成等待获取队列头部元素, 使用 await() 方法让当前线程等待信号. 如果 leader 为空, 则把当前线程设置为 leader, 使用 awaitNanos() 方法让当前线程等待接收信号或等待 delay 时间.
SynchronousQueue
与其他阻塞队列不同, 这是一个不存储元素的阻塞队列, 每一个 put 操作必须要等待一个 take 操作, 否则不能继续添加元素, 反之亦然. 分为公平和不公平访问队列, 默认情况采用非公平性策略访问队列.
该种队列本身不存储任何元素, 适合传递性场景, 把生产者线程处理的数据直接传递给消费者线程, 其吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue.
LinkedTransferQueue
这是一个由链表结构组成的 FIFO 的无界阻塞 TransferQueue 队列. 它采取一种预占模式, 也就是有就直接拿走, 没有就占着这个位置直到拿到, 超时或中断. 相对于其他阻塞队列, 多了 tryTransfer 方法和 transfer 方法.
transfer(e,[timeout,unit])
方法: 如果当前有消费者正等待接收元素, 该方法可以把生产者传入的元素立刻传输给消费者. 如果没有消费者等待, 该方法将元素存放在队列的 tail 节点, 等到该元素被消费者消费了才返回.
tryTransfer(e,[timeout,unit])
方法: 试探生产者传入的元素是否能直接传给消费者. 如果没有消费者等待接收元素, 返回 false. 该方法无论消费者是否接收都立即返回, 而 transfer 方法必须等消费了才返回.
LinkedBlockingDeque
是一个由链表组成的双向阻塞队列. 可以从队列两端插入和移除元素.
Fork/Join 框架
该框架主要应用在并行计算中, 把一个大人物分割成若干个小任务, 最终汇总每个小任务结果后得到大结果的框架. Fork 就是把一个大任务切分成若干子任务并行的执行, Join 就是合并这些子任务的执行结果, 最终得到这个大任务的结果.
工作窃取算法
工作窃取是指某个线程从其他队列里窃取任务来执行. 通常使用双端队列, 被窃取任务线程永远从双端队列头部拿任务执行, 窃取任务的线程永远从双端队列尾部拿任务执行.
优点是充分利用线程进行并行计算, 减少了线程间的竞争. 缺点是在某些情况下存在竞争, 比如队列只有一个任务时, 会消耗更多的资源.
框架设计思路
首先, 分割任务, 将一个大任务分割成子任务, 不停分割直到分割出的子任务足够小.
然后, 执行任务并合并结果. 分割的子任务分别放在双端队列, 然后几个启动线程分别从双端队列获取任务执行. 执行结果放在一个队列里, 启动一个线程从队列拿数据, 然后合并这些线程.
示例
- public class ForkJoinCase extends RecursiveTask<Integer> {
- private final int threshold=5;
- private int first;
- private int last;
- public ForkJoinCase(int first,int last){
- this.first=first;
- this.last=last;
- }
- @Override
- protected Integer compute() {
- int ret=0;
- if(last-first<=threshold){// 任务足够小, 执行
- for(int i=first;i<=last;i++){
- ret+=i;
- }
- }else{// 分解任务
- int mid=first+(last-first)/2;
- ForkJoinCase leftTask=new ForkJoinCase(first,mid);
- ForkJoinCase rightTask=new ForkJoinCase(mid+1,last);
- // 执行子任务
- leftTask.fork();
- rightTask.fork();
- // 合并子任务结果
- ret=leftTask.join()+rightTask.join();
- }
- return ret;
- }
- }
参考资料
Java 并发编程的艺术
http://cmsblogs.com/?p=2611
来源: https://juejin.im/post/5c2c52d9e51d455405559378