ReentrantLock,重入锁,是 JDK5 中添加在并发包下的一个高性能的工具。顾名思义,ReentrantLock 支持同一个线程在未释放锁的情况下重复获取锁。
每一个东西的出现一定是有价值的。既然已经有了元老级的 synchronized,而且 synchronized 也支持重入,为什么 Doug Lea 还要专门写一个 ReentrantLock 呢?
0 ReentrantLock 与 synchronized 的比较
0.1 性能上的比较
首先,ReentrantLock 的性能要优于 synchronized。下面通过两段代码比价一下。
首先是 synchronized:
public class LockDemo2 { private static final Object lock = new Object(); // 定义锁对象 private static int count = 0; // 累加数 public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); CountDownLatch cdl = new CountDownLatch(100); // 启动100个线程对count累加,每个线程累加1000000次 // 调用add函数累加,通过synchronized保证多线程之间的同步 for (int i=0;i<100;i++) { new Thread(() -> { for (int i1 = 0; i1 <1000000; i1++) { add(); } cdl.countDown(); }).start(); } cdl.await(); System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count); } private static void add() { synchronized (lock) { count++; } } }
然后是 ReentrantLock:
public class LockDemo3 { private static Lock lock = new ReentrantLock(); // 重入锁 private static int count = 0; public static void main(String[] args) throws InterruptedException { long start = System.currentTimeMillis(); CountDownLatch cdl = new CountDownLatch(100); for (int i=0;i<100;i++) { new Thread(() -> { for (int i1 = 0; i1 <1000000; i1++) { add(); } cdl.countDown(); }).start(); } cdl.await(); System.out.println("Time cost: " + (System.currentTimeMillis() - start) + ", count = " + count); } // 通过ReentrantLock保证线程之间的同步 private static void add() { lock.lock(); count++; lock.unlock(); } }
下面是运行多次的结果对比:
synchronized | ReentrantLock | |
---|---|---|
第一次 | 4620 ms | 3360 ms |
第二次 | 4086 ms | 3138 ms |
第三次 | 4650 ms | 3408 ms |
总体来看,ReentrantLock 的平均性能要比 synchronized 好 20% 左右。
PS:更严谨的描述一下这个性能的对比:当存在大量线程竞争锁时,多数情况下 ReentrantLock 的性能优于 synchronized。
因为在 JDK6 中对 synchronized 做了优化,在锁竞争不激烈的时候,多数情况下锁会停留在偏向锁和轻量级锁阶段,这两个阶段性能是很好的。当存在大量竞争时,可能会膨胀为重量级锁,性能下降,此时的 ReentrantLock 应该是优于 synchronized 的。
0.2 获取锁公平性的比较
公平性是啥概念呢?如果是公平的获取锁,就是说多个线程之间获取锁的时候要排队,依次获取锁;如果是不公平的获取锁,就是说多个线程获取锁的时候一哄而上,谁抢到是谁的。
由于 synchronized 是基于 monitor 机制实现的,它只支持非公平锁;但 ReentrantLock 同时支持公平锁和非公平锁。
0.3 综述
除了上文所述,ReentrantLock 还有一些其他 synchronized 不具备的特性,这里来总结一下。
synchronized | ReentrantLock | |
---|---|---|
性能 | 相对较差 | 优于 synchronized 20% 左右 |
公平性 | 只支持非公平锁 | 同时支持公平锁与非公平锁 |
尝试获取锁的支持 | 不支持,一旦到了同步块,且没有获取到锁,就阻塞在这里 | 支持,通过 tryLock 方法实现,可通过其返回值判断是否成功获取锁,所以即使获取锁失败也不会阻塞在这里 |
超时的获取锁 | 不支持,如果一直获取不到锁,就会一直等待下去 | 支持,通过 tryLock(time, TimeUnit)方法实现,如果超时了还没获取锁,就放弃获取锁,不会一直阻塞下去 |
是否可响应中断 | 不支持,不可响应线程的 interrupt 信号 | 支持,通过 lockInterruptibly 方法实现,通过此方法获取锁之后,线程可响应 interrupt 信号,并抛出 InterruptedException 异常 |
等待条件的支持 | 支持,通过 wait、notify、notifyAll 来实现 | 支持,通过 Conditon 接口实现,支持多个 Condition,比 synchronized 更加灵活 |
1 可重入功能的实现原理
ReentrantLock 的实现基于队列同步器(AbstractQueuedSynchronizer,后面简称 AQS),关于 AQS 的实现原理,可以看笔者的另一篇文章:
AQS 的实现原理
ReentrantLock 的可重入功能基于 AQS 的同步状态:state。
其原理大致为:当某一线程获取锁后,将 state 值 +1,并记录下当前持有锁的线程,再有线程来获取锁时,判断这个线程与持有锁的线程是否是同一个线程,如果是,将 state 值再 +1,如果不是,阻塞线程。
当线程释放锁时,将 state 值-1,当 state 值减为 0 时,表示当前线程彻底释放了锁,然后将记录当前持有锁的线程的那个字段设置为 null,并唤醒其他线程,使其重新竞争锁。
// acquires的值是1 final boolean nonfairTryAcquire(int acquires) { // 获取当前线程 final Thread current = Thread.currentThread(); // 获取state的值 int c = getState(); // 如果state的值等于0,表示当前没有线程持有锁 // 尝试将state的值改为1,如果修改成功,则成功获取锁,并设置当前线程为持有锁的线程,返回true if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // state的值不等于0,表示已经有其他线程持有锁 // 判断当前线程是否等于持有锁的线程,如果等于,将state的值+1,并设置到state上,获取锁成功,返回true // 如果不是当前线程,获取锁失败,返回false else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
2 非公平锁的实现原理
ReentrantLock 有两个构造函数:
// 无参构造,默认使用非公平锁(NonfairSync) public ReentrantLock() { sync = new NonfairSync(); } // 通过fair参数指定使用公平锁(FairSync)还是非公平锁(NonfairSync) public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
sync 是 ReentrantLock 的成员变量,是其内部类 Sync 的实例。NonfairSync 和 FairSync 都是 Sync 类的子类。可以参考如下类关系图:
Sync 继承了 AQS,所以他具备了 AQS 的功能。同样的,NonfairSync 和 FairSync 都是 AQS 的子类。
当我们通过无参构造函数获取 ReentrantLock 实例后,默认用的就是非公平锁。
下面将通过如下场景描述非公平锁的实现原理:假设一个线程(t1)获取到了锁,其他很多没获取到锁的线程(others_t)加入到了 AQS 的同步队列中等待,当这个线程执行完,释放锁后,其他线程重新非公平的竞争锁。
先来描述一下获取锁的方法:
final void lock() { // 线程t1成功的将state的值从0改为1,表示获取锁成功 if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else // others_t线程们没有获取到锁 acquire(1); }
如果获取锁失败,会调用 AQS 的 acquire 方法
public final void acquire(int arg) { // tryAcquire是个模板方法,在NonfairSync中实现,如果在tryAcquire方法中依然获取锁失败,会将当前线程加入同步队列中等待(addWaiter) if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
tryAcquire 的实现如下,其实是调用了上面的 nonfairTryAcquire 方法
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
OK,此时 t1 获取到了锁,others_t 线程们都跑到同步队列里等着了。
某一时刻,t1 自己的任务执行完成,调用了释放锁的方法(unlock)。
public void unlock() { // 调用AQS的release方法释放资源 sync.release(1); }
public final boolean release(int arg) { // tryRelease也是模板方法,在Sync中实现 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 成功释放锁后,唤醒同步队列中的下一个节点,使之可以重新竞争锁 // 注意此时不会唤醒队列第一个节点之后的节点,这些节点此时还是无法竞争锁 unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { // 将state的值-1,如果-1之后等于0,释放锁成功 int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
这时锁被释放了,被唤醒的线程和新来的线程重新竞争锁(不包含同步队列后面的那些线程)。
回到 lock 方法中,由于此时所有线程都能通过 CAS 来获取锁,并不能保证被唤醒的那个线程能竞争过新来的线程,所以是非公平的。这就是非公平锁的实现。
这个过程大概可以描述为下图这样子:
3 公平锁的实现原理
公平锁与非公平锁的释放锁的逻辑是一样的,都是调用上述的 unlock 方法,最大区别在于获取锁的时候。
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; // 获取锁,与非公平锁的不同的地方在于,这里直接调用的AQS的acquire方法,没有先尝试获取锁 // acquire又调用了下面的tryAcquire方法,核心在于这个方法 final void lock() { acquire(1); } /** * 这个方法和nonfairTryAcquire方法只有一点不同,在标注为#1的地方 * 多了一个判断hasQueuedPredecessors,这个方法是判断当前AQS的同步队列中是否还有等待的线程 * 如果有,返回true,否则返回false。 * 由此可知,当队列中没有等待的线程时,当前线程才能尝试通过CAS的方式获取锁。 * 否则就让这个线程去队列后面排队。 */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // #1 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
通过注释可知,在公平锁的机制下,任何线程想要获取锁,都要排队,不可能出现插队的情况。这就是公平锁的实现原理。
这个过程大概可以描述为下图这样子:
4 tryLock 原理
tryLock 做的事情很简单:让当前线程尝试获取一次锁,成功的话返回 true,否则 false。
其实现,其实就是调用了 nonfairTryAcquire 方法来获取锁。
public boolean tryLock() { return sync.nonfairTryAcquire(1); }
至于获取失败的话,他也不会将自己添加到同步队列中等待,直接返回 false,让业务调用代码自己处理。
5 可中断的获取锁
中断,也就是通过 Thread 的 interrupt 方法将某个线程中断,中断一个阻塞状态的线程,会抛出一个 InterruptedException 异常。
如果获取锁是可中断的,当一个线程长时间获取不到锁时,我们可以主动将其中断,可避免死锁的产生。
其实现方式如下:
public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); }
会调用 AQS 的 acquireInterruptibly 方法
public final void acquireInterruptibly(int arg) throws InterruptedException { // 判断当前线程是否已经中断,如果已中断,抛出InterruptedException异常 if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
此时会优先通过 tryAcquire 尝试获取锁,如果获取失败,会将自己加入到队列中等待,并可随时响应中断。
private void doAcquireInterruptibly(int arg) throws InterruptedException { // 将自己添加到队列中等待 final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { // 自旋的获取锁 for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } // 获取锁失败,在parkAndCheckInterrupt方法中,通过LockSupport.park()阻塞当前线程, // 并调用Thread.interrupted()判断当前线程是否已经被中断 // 如果被中断,直接抛出InterruptedException异常,退出锁的竞争队列 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // #1 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
PS:不可中断的方式下,代码#1 位置不会抛出 InterruptedException 异常,只是简单的记录一下当前线程被中断了。
6 可超时的获取锁
通过如下方法实现,timeout 是超时时间,unit 代表时间的单位(毫秒、秒...)
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
可以发现,这也是一个可以响应中断的方法。然后调用 AQS 的 tryAcquireNanos 方法:
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
doAcquireNanos 方法与中断里面的方法大同小异,下面在注释中说明一下不同的地方:
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; // 计算超时截止时间 final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } // 计算到截止时间的剩余时间 nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) // 超时了,获取失败 return false; // 超时时间大于1000纳秒时,才阻塞 // 因为如果小于1000纳秒,基本可以认为超时了(系统调用的时间可能都比这个长) if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); // 响应中断 if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
7 总结
本文首先对比了元老级的锁 synchronized 与 ReentrantLock 的不同,ReentrantLock 具有一下优势:
- 同时支持公平锁与非公平锁
- 支持:尝试非阻塞的一次性获取锁
- 支持超时获取锁
- 支持可中断的获取锁
- 支持更多的等待条件(Condition)
然后介绍了几个主要特性的实现原理,这些都是基于 AQS 的。
ReentrantLock 的核心,是通过修改 AQS 中 state 的值来同步锁的状态。
通过这个方式,实现了可重入。
ReentrantLock 具备公平锁和非公平锁,默认使用非公平锁。其实现原理主要依赖于 AQS 中的同步队列。
最后,可中断的机制是内部通过 Thread.interrupted()判断当前线程是否已被中断,如果被中断就抛出 InterruptedException 异常来实现的。
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于