以 add 方法作为入口, 在 add 方法中会调用父类的 add 方法, 也就是 AbstractQueue. 如果看源码看得比较多的话,
一般这种写法都是调用父类的模版方法来解决通用性问题
- public boolean add(E e) {
- return super.add(e);
- }
从父类的 add 方法可以看到, 这里做了一个队列是否满了的判断, 如果队列满了直接抛出一个异常
- public boolean add(E e) {
- if (offer(e))
- return true;
- else
- throw new IllegalStateException("Queue
- full");
- }
offer 方法
add 方法最终还是调用 offer 方法来添加数据, 返回一个添加成功或者失败的布尔值反馈
这段代码做了几个事情
1. 判断添加的数据是否为空
2. 添加重入锁
3. 判断队列长度, 如果队列长度等于数组长度, 表示满了直接返回 false
4. 否则, 直接调用 enqueue 将元素添加到队列中
- public boolean offer(E e) {
- checkNotNull(e); // 对请求数据做判断
- final ReentrantLock lock = this.lock;
- lock.lock();
- try {
- if (count == items.length)
- return false;
- else {
- enqueue(e);
- return true;
- }
- } finally {
- lock.unlock();
- } }
- enqueue
这个是最核心的逻辑, 方法内部通过 putIndex 索引直接将元素添加到数组 items
- private void enqueue(E x) {
- // assert lock.getHoldCount() == 1;
- // assert items[putIndex] == null;
- final Object[] items = this.items;
- items[putIndex] = x; // 通过 putIndex 对数据赋值
- if (++putIndex == items.length) // 当 putIndex 等于数组长度时, 将 putIndex 重置为 0
- putIndex = 0;
- count++;// 记录队列元素的个数
- notEmpty.signal();// 唤醒处于等待状态下的线程, 表示当前队列中的元素不为空, 如果存在消费者线程阻塞, 就可以开始取出元素
- }
这里大家肯定会有一个疑问, putIndex 为什么会在等于数组长度的时候重新设置为 0. 因为 ArrayBlockingQueue 是一个 FIFO 的队列, 队列添加元素时, 是从队尾获取 putIndex 来存储元素, 当 putIndex 等于数组长度时, 下次就需要从数组头部开始添加了.
下面这个图模拟了添加到不同长度的元素时, putIndex 的变化, 当 putIndex 等于数组长度时, 不可能让 putIndex 继续累加, 否则会超出数组初始化的容量大小. 同时大家还需要思考两个问题
1. 当元素满了以后是无法继续添加的, 因为会报错
2. 其次, 队列中的元素肯定会有一个消费者线程通过 take 或者其他方法来获取数据, 而获取数据的同时元素也会从队列中移除
put 方法
put 方法和 add 方法功能一样, 差异是 put 方法如果队列满了, 会阻塞. 这个在最开始的时候说过. 接下来看一下它的实现逻辑
- public void put(E e) throws
- InterruptedException {
- checkNotNull(e);
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly(); // 这个也是获得锁, 但是和 lock 的区别是, 这个方法优先允许在等待时由其他线程调用等待线程的 interrupt 方法来中断等待直接返回. 而 lock 方法是尝试获得锁成功后才响应中断
- try {
- while (count == items.length)
- notFull.await();// 队列满了的情况下, 当前线程将会被 notFull 条件对象挂起加到等待队列中
- enqueue(e);
- } finally {
- lock.unlock();
- } }
take 方法
take 方法是一种阻塞获取队列中元素的方法它的实现原理很简单, 有就删除没有就阻塞, 注意这个阻塞是可以中断的, 如果队列没有数据那么就加入 notEmpty 条件队列等待 (有数据就直接取走, 方法结束), 如果有新的 put 线程添加了数据, 那么 put 操作将会唤醒 take 线程, 执行 take 操作
- public E take() throws InterruptedException {
- final ReentrantLock lock = this.lock;
- lock.lockInterruptibly();
- try {
- while (count == 0)
- notEmpty.await(); // 如果队列为空的情况下, 直接通过 await 方法阻塞
- return dequeue();
- } finally {
- lock.unlock();
- } }
如果队列中添加了元素, 那么这个时候, 会在 enqueue 中调用 notempty.signal 唤醒 take 线程来获得元素
dequeue 方法
这个是出队列的方法, 主要是删除队列头部的元素并发返回给客户端 takeIndex, 是用来记录拿数据的索引值
- private E dequeue() {
- // assert lock.getHoldCount() == 1;
- // assert items[takeIndex] != null;
- final Object[] items = this.items;
- @SuppressWarnings("unchecked")
- E x = (E) items[takeIndex]; // 默认获取 0 位置的元素
- items[takeIndex] = null;// 将该位置的元素设置为空
- if (++takeIndex == items.length)// 这里的作用也是一样, 如果拿到数组的最大值, 那么重置为 0, 继续从头部位置开始获取数据
- takeIndex = 0;
- count--;// 记录 元素个数递减
- if (itrs != null)
- itrs.elementDequeued();// 同时更新迭代器中的元素数据
- notFull.signal();// 触发 因为队列满了以后导致的被阻塞的线程
- return x;
- }
- itrs.elementDequeued();
ArrayBlockingQueue 中, 实现了迭代器的功能, 也就是可以通过迭代器来遍历阻塞队列中的元素
所以 itrs.elementDequeued() 是用来更新迭代器中的元素数据的 takeIndex 的索引变化图如下, 同时随着数据的移除, 会唤醒处于 put 阻塞状态下的线程来继续添加数据
remove 方法
remove 方法是移除一个指定元素. 看看它的实现代码
- public boolean remove(Object o) {
- if (o == null) return false;
- final Object[] items = this.items; // 获取数组元素
- final ReentrantLock lock = this.lock;
- lock.lock(); // 获得锁
- try {
- if (count> 0) { // 如果队列不为空
- final int putIndex = this.putIndex; // 获取下一个要添加元素时的索引
- int i = takeIndex;// 获取当前要被移除的元素的索引
- do {
- if (o.equals(items[i])) {// 从 takeIndex 下标开始, 找到要被删除的元素
- removeAt(i);// 移除指定元素
- return true;// 返回执行结果
- }
- // 当前删除索引执行加 1 后判断是否与数组长度相等
- // 若为 true, 说明索引已到数组尽头, 将 i 设置为 0
- if (++i == items.length)
- i = 0;
- } while (i != putIndex);// 继续查找, 直到找到最后一个元素
- }
- return false;
- } finally {
- lock.unlock();
- } }
原子操作类
原子性这个概念, 在多线程编程里是一个老生常谈的问题.
所谓的原子性表示一个或者多个操作, 要么全部执行完, 要么一个也不执行. 不能出现成功一部分失败一部分的情况.
在多线程中, 如果多个线程同时更新一个共享变量, 可能会得到一个意料之外的值. 比如 i=1 .A 线程更新 i+1 ,B 线程也更新 i+1.
通过两个线程并行操作之后可能 i 的值不等于 3. 而可能等于 2. 因为 A 和 B 在更新变量 i 的时候拿到的 i 可能都是 1
这就是一个典型的原子性问题
前面几节课我们讲过, 多线程里面, 要实现原子性, 有几种方法, 其中一种就是加 Synchronized 同步锁.
而从 JDK1.5 开始, 在 J.U.C 包中提供了 Atomic 包, 提供了对于常用数据结构的原子操作. 它提供了简单, 高效, 以及线程安全的更新一个变量的方式
J.U.C 中的原子操作类
由于变量类型的关系, 在 J.U.C 中提供了 12 个原子操作的类. 这 12 个类可以分为四大类
1. 原子更新基本类型
AtomicBoolean,AtomicInteger,AtomicLong
2. 原子更新数组
AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
3. 原子更新引用
AtomicReference , AtomicReferenceFieldUpdater ,AtomicMarkableReference(更新带有标记位的引用类型)
4. 原子更新字段
AtomicIntegerFieldUpdater,AtomicLongFieldUpdater,AtomicStampedReference
AtomicInteger 原理分析
接下来, 我们来剖析一下 AtomicInteger 的实现原理, 仍然是基于我们刚刚在前面的案例中使用到的方法作为突破口
getAndIncrement
getAndIncrement 实际上是调用 unsafe 这个类里面提供的方法,
Unsafe 类我们前面在分析 AQS 的时候讲过, 这个类相当于是一个后门, 使得 Java 可以像 C 语言的指针一样直接操作内存空间. 当然也会带来一些弊端, 就是指针的问题.
实际上这个类在很多方面都有使用, 除了 J.U.C 这个包以外, 还有 Netty,kafka 等等这个类提供了很多功能, 包括多线程同步 (monitorEnter),CAS 操 作 (compareAndSwap) , 线程的挂起和恢复 (park/unpark), 内存屏障 (loadFence/storeFence)
内存管理 (内存分配, 释放内存, 获取内存地址等.)
- public final int getAndIncrement() {
- return unsafe.getAndAddInt(this,
- valueOffset, 1);
- }
valueOffset, 也比较熟了. 通过 unsafe.objectFieldOffset() 获取当前 Value 这个变量在内存中的偏移量, 后续会基于这个偏移量从内存中得到 value 的值来和当前的值做比较, 实现乐观锁
- private static final long valueOffset;
- static {
- try {
- valueOffset = unsafe.objectFieldOffset
- (AtomicInteger.class.getDeclaredField("value")
- );
- } catch (Exception ex) { throw new
- Error(ex); }
- }
- getAndAddInt
通过 do/while 循环, 基于 CAS 乐观锁来做原子递增. 实际上前面的 valueOffset 的作用就是从主内存中获得当前 value 的值和预期值做一个比较, 如果相等, 对 value 做递增并结束循环
- public final int getAndAddInt(Object var1, long
- var2, int var4) {
- int var5;
- do {
- var5 = this.getIntVolatile(var1, var2);
- } while(!this.compareAndSwapInt(var1, var2,
- var5, var5 + var4));
- return var5;
- }
get 方法
get 方法只需要直接返回 value 的值就行, 这里的 value 是通过 Volatile 修饰的, 用来保证可见性
- public final int get() {
- return value; }
其他方法
AtomicInteger 的实现非常简单, 所以我们可以很快就分析完它的实现原理, 当然除了刚刚分析的这两个方法之外,
还有其他的一些比 如 它 提 供 了 compareAndSet , 允 许 客 户 端 基 于 AtomicInteger 来实现乐观锁的操作
- public final boolean compareAndSet(int expect,
- int update) {
- return unsafe.compareAndSwapInt(this,
- valueOffset, expect, update);
- }
来源: https://www.cnblogs.com/lvqiansheng/p/13409019.html