Condition 因素出 Object 监视器方法 ( wait , notify 和 notifyAll ) 成不同的对象, 以得到具有多个等待集的每个对象, 通过将它们与使用任意的组合的效果 Lock 个实现. Lock 替换 synchronized 方法和语句的使用, Condition 取代了对象监视器方法的使用.
条件 (也称为条件队列或条件变量 ) 为一个线程暂停执行 ("等待") 提供了一种方法, 直到另一个线程通知某些状态现在可能为真. 因为访问此共享状态信息发生在不同的线程中, 所以它必须被保护, 因此某种形式的锁与该条件相关联. 等待条件的关键属性是它原子地释放相关的锁并挂起当前线程, 就像 Object.wait .
一个 Condition 实例本质上绑定到一个锁. 要获得特定 Condition 实例的 Condition 实例, 请使用其 newCondition()方法.
例如, 假设我们有一个有限的缓冲区, 它支持 put 和 take 方法. 如果在一个空的缓冲区尝试一个 take , 则线程将阻塞直到一个项目可用; 如果 put 试图在一个完整的缓冲区, 那么线程将阻塞, 直到空间变得可用. 我们希望在单独的等待集中等待 put 线程和 take 线程, 以便我们可以在缓冲区中的项目或空间可用的时候使用仅通知单个线程的优化. 这可以使用两个 Condition 实例来实现.
- class BoundedBuffer {
- final Lock lock = new ReentrantLock();
- final Condition notFull = lock.newCondition();
- final Condition notEmpty = lock.newCondition();
- final Object[] items = new Object[100];
- int putptr, takeptr, count;
- public void put(Object x) throws InterruptedException {
- lock.lock(); try {
- while (count == items.length)
- notFull.await();
- items[putptr] = x;
- if (++putptr == items.length) putptr = 0;
- ++count;
- notEmpty.signal();
- } finally { lock.unlock(); }
- }
- public Object take() throws InterruptedException {
- lock.lock(); try {
- while (count == 0)
- notEmpty.await();
- Object x = items[takeptr];
- if (++takeptr == items.length) takeptr = 0;
- --count;
- notFull.signal();
- return x;
- } finally { lock.unlock(); }
- }
- }
下面是一个更明晰的例子
- public static void main(String[] args) {
- ReentrantLock reentrantLock = new ReentrantLock();
- Condition condition = reentrantLock.newCondition();// 创建 condition
- // 线程 1
- new Thread(() -> {
- try {
- reentrantLock.lock();
- log.info("wait signal"); // 1
- condition.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- log.info("get signal"); // 4
- reentrantLock.unlock();
- }).start();
- // 线程 2
- new Thread(() -> {
- reentrantLock.lock();
- log.info("get lock"); // 2
- try {
- Thread.sleep(3000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- condition.signalAll();// 发送信号
- log.info("send signal"); // 3
- reentrantLock.unlock();
- }).start();
- }
输出过程讲解:
1, 线程 1 调用了 reentrantLock.lock(), 线程进入 AQS 等待队列, 输出 1 号 log
2, 接着调用了 awiat 方法, 线程从 AQS 队列中移除, 锁释放, 直接加入 condition 的等待队列中
3, 线程 2 因为线程 1 释放了锁, 拿到了锁, 输出 2 号 log
4, 线程 2 执行 condition.signalAll()发送信号, 输出 3 号 log
5,condition 队列中线程 1 的节点接收到信号, 从 condition 队列中拿出来放入到了 AQS 的等待队列, 这时线程 1 并没有被唤醒.
6, 线程 2 调用 unlock 释放锁, 因为 AQS 队列中只有线程 1, 因此 AQS 释放锁按照从头到尾的顺序, 唤醒线程 1 7, 线程 1 继续执行, 输出 4 号 log, 并进行 unlock 操作.
ReentrantReadWriteLock 的使用
在没有任何读写锁的时候才可以取得写入锁(悲观读取, 容易写线程饥饿), 也就是说如果一直存在读操作, 那么写锁一直在等待没有读的情况出现, 这样我的写锁就永远也获取不到, 就会造成等待获取写锁的线程饥饿. 平时使用的场景并不多.
- private final Map<String, Data> map = new TreeMap<>();
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- private final Lock readLock = lock.readLock();// 读锁
- private final Lock writeLock = lock.writeLock();// 写锁
- // 加读锁
- public Data get(String key) {
- readLock.lock();
- try {
- return map.get(key);
- } finally {
- readLock.unlock();
- }
- }
- // 加写锁
- public Data put(String key, Data value) {
- writeLock.lock();
- try {
- return map.put(key, value);
- } finally {
- writeLock.unlock();
- }
- }
- StampedLock
好文章: StampedLock 将是解决同步问题的新宠 http://www.importnew.com/14941.HTML
介绍: 一种基于能力的锁, 具有三种模式用于控制读 / 写访问. StampedLock 的状态由版本和模式组成. 锁定采集方法返回一个表示和控制相对于锁定状态的访问的印记; 这些方法的 "尝试" 版本可能会返回特殊值为零以表示获取访问失败. 锁定释放和转换方法要求邮票作为参数, 如果它们与锁的状态不匹配则失败. 这三种模式是:
写作. 方法 writeLock()可能阻止等待独占访问, 返回可以在方法 unlockWrite(long)中使用的邮票来释放锁定. 不定时的和定时版本 tryWriteLock , 还提供. 当锁保持写入模式时, 不能获得读取锁定, 并且所有乐观读取验证都将失败.
读. 方法 readLock()可能阻止等待非独占访问, 返回可用于方法 unlockRead(long)释放锁的戳记 . 不定时的和定时版本 tryReadLock , 还提供.
乐观阅读 方法 tryOptimisticRead()只有当锁当前未保持在写入模式时才返回非零标记. 方法 validate(long)返回 true, 如果在获取给定的邮票时尚未在写入模式中获取锁定. 这种模式可以被认为是一个非常弱的版本的读锁, 可以随时由作家打破. 对简单的只读代码段使用乐观模式通常会减少争用并提高吞吐量. 然而, 其使用本质上是脆弱的. 乐观阅读部分只能读取字段并将其保存在局部变量中, 以供后验证使用. 以乐观模式读取的字段可能会非常不一致, 因此只有在熟悉数据表示以检查一致性和 / 或重复调用方法 validate()时, 使用情况才适用. 例如, 当首次读取对象或数组引用, 然后访问其字段, 元素或方法之一时, 通常需要这样的步骤.
JDK 上提供的例子
- class Point {
- private double x, y;
- private final StampedLock sl = new StampedLock();
- void move(double deltaX, double deltaY) { // an exclusively locked method
- long stamp = sl.writeLock();
- try {
- x += deltaX;
- y += deltaY;
- } finally {
- sl.unlockWrite(stamp);
- }
- }
- // 下面看看乐观读锁案例
- double distanceFromOrigin() { // A read-only method
- long stamp = sl.tryOptimisticRead(); // 获得一个乐观读锁
- double currentX = x, currentY = y; // 将两个字段读入本地局部变量
- if (!sl.validate(stamp)) { // 检查发出乐观读锁后同时是否有其他写锁发生?
- stamp = sl.readLock(); // 如果没有, 我们再次获得一个读悲观锁
- try {
- currentX = x; // 将两个字段读入本地局部变量
- currentY = y; // 将两个字段读入本地局部变量
- } finally {
- sl.unlockRead(stamp);
- }
- }
- return Math.sqrt(currentX * currentX + currentY * currentY);
- }
- // 下面是悲观读锁案例
- void moveIfAtOrigin(double newX, double newY) { // upgrade
- // Could instead start with optimistic, not read mode
- long stamp = sl.readLock();
- try {
- while (x == 0.0 && y == 0.0) { // 循环, 检查当前状态是否符合
- long ws = sl.tryConvertToWriteLock(stamp); // 将读锁转为写锁
- if (ws != 0L) { // 这是确认转为写锁是否成功
- stamp = ws; // 如果成功 替换票据
- x = newX; // 进行状态改变
- y = newY; // 进行状态改变
- break;
- }
- else { // 如果不能成功转换为写锁
- sl.unlockRead(stamp); // 我们显式释放读锁
- stamp = sl.writeLock(); // 显式直接进行写锁 然后再通过循环再试
- }
- }
- } finally {
- sl.unlock(stamp); // 释放读锁或写锁
- }
- }
- }
synchronized 是在 JVM 层面上实现的, 不但可以通过一些监控工具监控 synchronized 的锁定, 而且在代码执行时出现异常, JVM 会自动释放锁定;
ReentrantLock,ReentrantReadWriteLock,,StampedLock 都是对象层面的锁定, 要保证锁定一定会被释放, 就必须将 unLock()放到 finally{}中;
StampedLock 对吞吐量有巨大的改进, 特别是在读线程越来越多的场景下;
StampedLock 有一个复杂的 API, 对于加锁操作, 很容易误用其他方法;
当只有少量竞争者的时候, synchronized 是一个很好的通用的锁实现;
当线程增长能够预估, ReentrantLock 是一个很好的通用的锁实现;
StampedLock 可以说是 Lock 的一个很好的补充, 吞吐量以及性能上的提升足以打动很多人了, 但并不是说要替代之前 Lock 的东西, 毕竟他还是有些应用场景的, 起码 API 比 StampedLock 容易入手.
锁的选择
1, 当只有少量竞争者, 使用 synchronized
2, 竞争者不少但是线程增长的趋势是能预估的, 使用 ReetrantLock
3,synchronized 不会造成死锁, jvm 会自动释放死锁.
- package com.rong.juc;
- import java.util.Random;
- import java.util.concurrent.BrokenBarrierException;
- import java.util.concurrent.CyclicBarrier;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.atomic.AtomicInteger;
- import java.util.concurrent.atomic.AtomicLong;
- import java.util.concurrent.locks.ReentrantLock;
- /**
- *
- * @ClassName: ReentrantLockDemo
- * @Description:TODO(这里用一句话描述这个类的作用)
- * @author: rongbo
- * @date: 2018 年 9 月 23 日 下午 10:57:27
- *
- *
- */
- public class ReentrantLockTest {
- public static void test(int round, int threadNum, CyclicBarrier cyclicBarrier) {
- new SyncTest("Sync", round, threadNum, cyclicBarrier).testTime();
- new LockTest("Lock", round, threadNum, cyclicBarrier).testTime();
- new AtomicTest("Atom", round, threadNum, cyclicBarrier).testTime();
- }
- public static void main(String args[]) {
- for (int i = 0; i < 5; i++) {
- int round = 10000 * (i + 1);
- int threadNum = 5 * (i + 1);
- CyclicBarrier cb = new CyclicBarrier(threadNum * 2 + 1);
- System.out.println("==========================");
- System.out.println("round:" + round + "thread:" + threadNum);
- test(round, threadNum, cb);
- }
- }
- }
- class SyncTest extends TestTemplate {
- public SyncTest(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
- super(_id, _round, _threadNum, _cb);
- }
- @Override
- /**
- * synchronized 关键字不在方法签名里面, 所以不涉及重载问题
- */
- synchronized long getValue() {
- return super.countValue;
- }
- @Override
- synchronized void sumValue() {
- super.countValue += preInit[index++ % round];
- }
- }
- class LockTest extends TestTemplate {
- ReentrantLock lock = new ReentrantLock();
- public LockTest(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
- super(_id, _round, _threadNum, _cb);
- }
- /**
- * synchronized 关键字不在方法签名里面, 所以不涉及重载问题
- */
- @Override
- long getValue() {
- try {
- lock.lock();
- return super.countValue;
- } finally {
- lock.unlock();
- }
- }
- @Override
- void sumValue() {
- try {
- lock.lock();
- super.countValue += preInit[index++ % round];
- } finally {
- lock.unlock();
- }
- }
- }
- class AtomicTest extends TestTemplate {
- public AtomicTest(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
- super(_id, _round, _threadNum, _cb);
- }
- @Override
- /**
- * synchronized 关键字不在方法签名里面, 所以不涉及重载问题
- */
- long getValue() {
- return super.countValueAtmoic.get();
- }
- @Override
- void sumValue() {
- super.countValueAtmoic.addAndGet(super.preInit[indexAtomic.get() % round]);
- }
- }
- abstract class TestTemplate {
- private String id;
- protected int round;
- private int threadNum;
- protected long countValue;
- protected AtomicLong countValueAtmoic = new AtomicLong(0);
- protected int[] preInit;
- protected int index;
- protected AtomicInteger indexAtomic = new AtomicInteger(0);
- Random r = new Random(47);
- // 任务栅栏, 同批任务, 先到达 wait 的任务挂起, 一直等到全部任务到达制定的 wait 地点后, 才能全部唤醒, 继续执行
- private CyclicBarrier cb;
- public TestTemplate(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
- this.id = _id;
- this.round = _round;
- this.threadNum = _threadNum;
- cb = _cb;
- preInit = new int[round];
- for (int i = 0; i < preInit.length; i++) {
- preInit[i] = r.nextInt(100);
- }
- }
- abstract void sumValue();
- /*
- * 对 long 的操作是非原子的, 原子操作只针对 32 位 long 是 64 位, 底层操作的时候分 2 个 32 位读写, 因此不是线程安全
- */
- abstract long getValue();
- public void testTime() {
- ExecutorService se = Executors.newCachedThreadPool();
- long start = System.nanoTime();
- // 同时开启 2*ThreadNum 个数的读写线程
- for (int i = 0; i < threadNum; i++) {
- se.execute(new Runnable() {
- public void run() {
- for (int i = 0; i < round; i++) {
- sumValue();
- }
- // 每个线程执行完同步方法后就等待
- try {
- cb.await();
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- });
- se.execute(new Runnable() {
- public void run() {
- getValue();
- try {
- // 每个线程执行完同步方法后就等待
- cb.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- });
- }
- try {
- // 当前统计线程也 wait, 所以 CyclicBarrier 的初始值是 threadNum*2+1
- cb.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (BrokenBarrierException e) {
- e.printStackTrace();
- }
- // 所有线程执行完成之后, 才会跑到这一步
- long duration = System.nanoTime() - start;
- System.out.println(id + "=" + duration);
- }
- }
- View Code
- ==========================
- round:10000 thread:5
- Sync = 4578771
- Lock = 6079408
- Atom = 2358938
- ==========================
- round:20000 thread:10
- Sync = 10253723
- Lock = 6668266
- Atom = 3977932
- ==========================
- round:30000 thread:15
- Sync = 19530498
- Lock = 13254122
- Atom = 11142416
- ==========================
- round:40000 thread:20
- Sync = 31596091
- Lock = 24663350
- Atom = 18504161
- ==========================
- round:50000 thread:25
- Sync = 55158877
- Lock = 36521455
- Atom = 32352693
来源: https://www.cnblogs.com/aihuxi/p/9694445.html