最近在阅读《多处理器编程艺术》一书, 掌握了很多 Java 多线程的底层知识, 现在就做一下书中链表 - 锁的作用一章的总结.
为了节约你的时间, 本文主要内容如下:
带锁的链表队列
细粒度同步
乐观同步
惰性同步
非阻塞同步
粗粒度同步
所谓粗粒度同步其实很简单, 就是在 List 的 add,remove,contains 函数的开始就直接使用 Lock 加锁, 然后在函数结尾释放.
add 函数的代码如下所示, 函数的主体就是链表的遍历添加逻辑, 只不过在开始和结束进行了锁的获取和释放.
- private Node head;
- private Lock lock = new ReentrantLock();
- public boolean add(T item) {
- Node pred, curr;
- int key = item.hashCode();
- lock.lock();
- try {
- pred = head;
- curr = pred.next;
- while(curr.key <key) {
- pred = curr;
- curr = pred.next;
- }
- if (key == curr.key) {
- return false;
- } else {
- Node node = new Node(item);
- node.next = curr;
- pred.next = node;
- return true;
- }
- } finally {
- lock.unlock();
- }
- }
大家看到这里就会想到, 这不就是类似于 Hashtable 的实现方式吗? 把可能出现多线程问题的函数都用一个重入锁锁住. 但是这个方法的缺点很明显, 如果竞争激烈的话, 对链表的操作效率会很低, 因为 add,remove,contains 三个函数都需要获取锁, 也都需要等待锁的释放. 至于如何优化, 我们可以一步一步往下看
细粒度同步
我们可以通过锁定单个节点而不是整个链表来提高并发. 给每个节点增加一个 Lock 变量以及相关的 lock() 和 unlock() 函数, 当线程遍历链表的时候, 若它是第一个访问节点的线程, 则锁住被访问的节点, 在随后的某个时刻释放锁. 这种细粒度的锁机制允许并发线程以流水线的方式遍历链表.
使用这种方式来遍历链表, 必须同时获取两个相邻节点的锁, 通过 "交叉手" 的方式来获取锁: 除了初始的 head 哨兵节点外, 只有在已经获取 pred 的锁时, 才能获取 curr 的锁.
- // 每个 Node 对象中都有一个 Lock 对象, 可以进行 lock() 和 unlock() 操作
- public boolean add(T item) {
- int key = item.hashCode();
- head.lock();
- Node pred = head;
- try {
- Node curr = pred.next;
- curr.lock();
- try {
- while (curr.key < key) {
- pred.unlock();
- pred = curr;
- curr = pred.next;
- curr.lock();
- }
- if (curr.key == key) {
- return false;
- }
- Node newNode = new Node(item);
- newNode.next = curr;
- pred.next = newNode;
- return true;
- } finally {
- curr.unlock();
- }
- } finally {
- pred.unlock();
- }
- }
乐观同步
虽然细粒度锁是对单一粒度锁的一种改进, 但它仍然出现很长的获取锁和释放锁的序列. 而且, 访问链表中不同部分的线程仍然可能相互阻塞. 例如, 一个正在删除链表中第二个元素的线程将会阻塞所有试图查找后继节点的线程.
减少同步代价的一种方式就是乐观: 不需要获取锁就可以查找, 对找到的节点进行加锁, 然后确认锁住的节点是正确的; 如果一个同步冲突导致节点被错误的锁定, 则释放这些锁重新开始.
- public boolean add(T item) {
- int key = item.hashCode();
- while (true) { // 如果不成功, 就进行重试
- Node pred = head;
- Node curr = pred.next;
- while (curr.key < key) {
- pred = curr;
- curr = pred.next;
- }
- // 找到目标相关的 pred 和 curr 之后再将二者锁住
- pred.lock();
- curr.lock();
- try {
- // 锁住二者之后再进行判断, 是否存在并发冲突
- if (validate(pred, curr)) {
- // 如果不存在, 那么就直接进行正常操作
- if (curr.key == key) {
- return false;
- } else {
- Node node = new Node(item);
- node.next = curr;
- pred.next = node;
- }
- }
- } finally {
- pred.unlock();
- curr.unlock();
- }
- }
- }
- public boolean validate(Node pred, Node curr) {
- // 从队列头开始查找 pred 和 curr, 判断是否存在并发冲突
- Node node = head;
- while (node.key <= pred.key) {
- if (node == pred) {
- return pred.next == curr;
- }
- node = node.next;
- }
- return false;
- }
由于不再使用能保护并发修改的锁, 所以每个方法调用都可能遍历那些已经被删除的节点, 所以在进行添加, 删除获取判断是否存在的之前必须再次进行验证.
惰性同步
当不用锁遍历两次链表的代价比使用锁遍历一次链表的代价小很多时, 乐观同步的实现效果非常好. 但是这种算法的缺点之一就是 contains() 方法在遍历时需要锁, 这一点并不令人满意, 其原因在于对 contains() 的调用要比其他方法的调用频繁得多.
使用惰性同步的方法, 使得 contains() 调用是无等待的, 同时 add() 和 remove() 方法即使在被阻塞的情况下也只需要遍历一次链表.
对每个节点增加一个布尔类型的 marked 域, 用于说明该节点是否在节点集合中. 现在, 遍历不再需要锁定目标结点, 也没有必须通过重新遍历整个链表来验证结点是否可达. 所有未被标记的节点必然是可达的.
- //add 方法和乐观同步的方法一致, 只有检验方法做了修改.
- // 只需要检测节点的 marked 变量就可以, 并且查看 pred 的 next 是否还是指向 curr, 需要注意的是 marked 变量一定是 voliate 的.
- private boolean validate(Node pred, Node curr) {
- return !pred.marked && !curr.marked && pred.next == curr;
- }
惰性同步的优点之一就是能够将类似于设置一个 flag 这样的逻辑操作与类似于删除结点的链接这种对结构的物理改变分开. 通常情况下, 延迟操作可以是批量处理方式进行, 且在某个方便的时候再懒惰地进行处理, 从而降低了对结构进行物理修改的整体破裂性. 惰性同步的主要缺点是 add() 和 remove() 调用是阻塞的: 如果一个线程延迟, 那么其他线程也将延迟.
非阻塞同步
使用惰性同步的思维是非常有益处的. 我们可以进一步将 add(),remove() 和 contains() 这三个方法都变成非阻塞的. 前两个方法是无锁的, 最后一个方法是无等待的. 我们无法直接使用 compareAndSet() 来改变 next 域来实现, 因为这样会出现问题. 但是我们可以将结点的 next 域和 marked 域看作是单个的原子单位: 当 marked 域为 true 时, 对 next 域的任何修改都将失败.
我们可以使用 AtomicMarkableReference 对象将指向类型 T 的对象引用 next 和布尔值 marked 封装在一起. 这些域可以一起或单个地原子更新. 可以让每个结点的 next 域为一个 AtomicMarkableReference. 线程可以通过设置结点 next 域中的标记位来逻辑地删除 curr, 和其他正在执行 add() 和 remove() 的线程共享物理删除: 当每个线程遍历链表时, 通过物理删除所有被标记的节点来清理链表.
- public Windows find(Node head, int key) {
- Node pred = null, curr = null, succ = null;
- boolean[] marked = {false};
- boolean snip;
- retry: while(true) {
- pred = head;
- curr = curr.next.get(marked);
- while(true) {
- succ = curr.next.get(marked); // 获取 succ, 并且查看是被被标记
- while (marked[0]) {// 如果被标记了, 说明 curr 被逻辑删除了, 需要继续物理删除
- snip = pred.next.compareAndSet(curr, succ, false, false);//
- if (!snip) continue retry;
- curr = succ;
- succ = curr.next.get(marked);
- }
- // 当不需要删除后, 才继续遍历
- if (curr.key>= key) {
- return new Windows(pred, curr);
- }
- pred = curr;
- curr = succ;
- }
- }
- }
- public boolean add(T item) {
- int key = item.hashCode();
- while(true) {
- Windows Windows = find(head, key);
- Node pred = Windows.pred, curr = Windows.curr;
- if (curr.key == key) {
- return false;
- } else {
- Node node = new Node(item);
- node.next = new AtomicMarkableReference<>(curr, false);
- if (pred.next.compareAndSet(curr, node, false, false)) {
- return true;
- }
- }
- }
- }
- public boolean remove(T item) {
- int key = item.hashCode();
- boolean sinp;
- while(true) {
- Windows Windows = find(head, key);
- Node pred = Windows.pred, curr = Windows.curr;
- if (curr.key != key) {
- return false;
- } else {
- Node succ = curr.next.getReference();
- // 要进行删除了, 那么就直接将 curr.next 设置为 false, 然后在进行真正的物理删除.
- sinp = curr.next.compareAndSet(curr, succ, false, true);
- if (!sinp) {
- continue;
- }
- pred.next.compareAndSet(curr, succ, false, false);
- return true;
- }
- }
- }
- class Node {
- AtomicMarkableReference<Node> next;
- }
后记
文中的代码在我的 GitHub 的这个 repo 中都可以找到.
个人博客: remcarpediem.NET
个人微信公众号: remcarpediem
来源: https://yq.aliyun.com/articles/695006