(源码)JDK8-JUC-AQS 结合 ReentrantLock

本贴最后更新于 1274 天前,其中的信息可能已经东海扬尘

----选择最熟悉/简单的 ReentrantLock 作为突破口。

理论初步

前置知识: 公平锁和非公平锁 可重入锁 LockSupport CAS Volatile 自旋锁 数据结构之链表 模板模式
AQS是什么? 字面意思,抽象的队列同步器。不仅是简单的加锁、解锁。 技术解释,用来构建锁或者其他同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成获取线程的排队工作,并通过一个int类型变量表示持有锁的状态。
栈:先进先出 队列:单链表、双链表。ArrayList最经典的链表结构。 队列:靠链表和相应的数据结构完成队列的内容,各个线程要抢锁,抢得到的用,抢不到的要安排排队。
同步与同步器不同: 锁:面向锁的使用者。定义了程序员和锁交互的使用层API,隐藏了实现细节,直接调用即可。 同步器:面向锁的实现者。比如java并发大神DougLee,提出统一规范并简化了锁的实现,屏蔽了同步状态管理、阻塞线程排队和通知、唤醒机制等。

AQS 作用

AQS 能干嘛?加锁会导致阻塞,有阻塞就需要排队,实现排队必然需要有某种形式的队列进行管理。

(AQS 使用一个 volatile 的 int 类型的成员变量来表示同步状态,通知内置的 FIFO 队列来完成资源获取的排队工作,将每条要去抢占资源的线程封装成一个 Node 节点来实现锁的分配,通过 CAS 完成对 state 值的修改)

解释说明:抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排队等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。

既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?

如果共享资源被占用,那么就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是 CLH 队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是 AQS 的抽象表现。它将请求共享资源的线程封装成队列的节点(Node),通过 CAS、自旋以及 LockSupport.park()的方式,维护 state 变量的状态,使并发达到同步的控制效果。

----AQS 就是形成这样,唤醒,等待,加锁,释放,超时控制,取消等待的等等一系列操作。

怎样又快又快的实现上述功能呢?

AQS 是用来构建锁或者其他同步器组件的重量级基础框架及整个 JUC 体系的基石,通过内置的 FIFO 队列来完成资源获取线程的排队工作,并通过一个 int 类型变量表示持有锁的状态。

image.png

CLH:Craig、Landin and Hagersten 队列(三个人名),是一个单向链表,AQS 中的队列是 CLH 变体的虚拟双向队列 FIFO。

image.png

AQS 核心类和重要属性

类结构关系

image.png

image.png

源码概要
----部分省略 public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { private transient Thread exclusiveOwnerThread; } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; } private transient volatile Node head; private transient volatile Node tail; private volatile int state; } public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); } static final class NonfairSync extends Sync {} static final class FairSync extends Sync {} public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } public void lock() { sync.lock(); } public void unlock() { sync.release(1); } }
源码注释
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { //锁所属线程 private transient Thread exclusiveOwnerThread; } public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { /** 等待队列节点类。 等待队列是“CLH”(Craig, Landin和Hagersten)锁队列的变体。 CLH锁通常用于自旋锁。 相反,我们使用它们来阻塞同步器,但使用相同的基本策略,即在其节点的前身中保存关于线程的一些控制信息。 每个节点中的“状态”字段跟踪线程是否应该阻塞。 节点在其前身发布时收到信号。 否则,队列的每个节点都充当持有单个等待线程的特定通知样式的监视器。 状态字段并不控制线程是否被授予锁等。 如果线程是队列中的第一个线程,则可以尝试获取。 但成为第一并不能保证成功;它只给予了竞争的权利。 因此,当前释放的竞争者线程可能需要重新等待。 要进入CLH锁队列,需要将其作为新的原子拼接进去尾巴。要退出队列,只需设置头部字段。 <pre> +------+ prev +-----+ +-----+ head | | <---- | | <---- | | tail +------+ +-----+ +-----+ </pre> 插入CLH队列只需要对“tail”进行单个原子操作,因此有一个简单的原子点来区分从未排队到已排队。 类似地,退出队列只涉及更新“头部”。 但是,节点需要更多的工作来确定谁是其后继者,部分原因是要处理由于超时和中断而可能的取消。 “prev”链接(未在原始CLH锁中使用),主要用于处理取消。 如果一个节点被取消了,它的后续节点(通常)将被重新链接到一个未被取消的前任节点。 关于自旋锁的类似力学解释,请参阅Scott和Scherer在"http://www.cs.rochester.edu/u/scott/synchronization/"发表的论文。 我们还使用“next”链接去执行阻塞机制。 每个节点的线程id都保存在它自己的节点中,所以前任通过遍历下一个链接来确定它是哪个线程,从而向下一个节点发出唤醒信号。 确定继承节点必须避免与新排队的节点竞争,以设置它们的前任节点的“下一个”字段 当一个节点的后继节点看起来是空的时候,可以通过从原子更新的“tail”向后检查来解决这个问题。 (或者换个说法,下一个链接是一种优化,所以我们通常不需要反向扫描。) 消去在基本算法中引入了一些保守性。 由于我们必须轮询其他节点的取消,我们可能无法注意到一个被取消的节点是在我们前面还是后面。 这个问题的处理方法是在取消后总是取消后继程序,允许它们稳定在一个新的前任程序上,除非我们能确定一个未被取消的前任程序来承担这个责任。 CLH队列需要一个虚拟头节点来启动。 但是我们不是在建设的时候创造它们,因为如果没有争论,那就是白费力气。 相反,在第一次争用时构造节点并设置头和尾指针。 等待条件的线程使用相同的节点,但使用额外的链接。 条件只需要链接简单(非并发)链接队列中的节点,因为只有在独占持有时才会访问它们。 在等待时,节点被插入到条件队列中。 收到信号后,节点被转移到主队列。 状态字段的一个特殊值用于标记节点所在的队列。 */ static final class Node { //标记,表示节点正在共享模式中等待 static final Node SHARED = new Node(); //标记,表示节点正在独占模式中等待 static final Node EXCLUSIVE = null; /** waitStatus值表示线程已被取消 */ static final int CANCELLED = 1; /** waitStatus值表示后续线程需要unparking */ static final int SIGNAL = -1; /** waitStatus值表示线程正在等待状态*/ static final int CONDITION = -2; /** waitStatus值指示下一个获取的值应该无条件地传播 */ static final int PROPAGATE = -3; /** 数值按数字排列以简化使用。 非负值意味着节点不需要发出信号。 因此,大多数代码不需要检查特定的值,只需要检查符号。 对于普通同步节点,该字段初始化为0,对于条件节点,该字段初始化为CONDITION。 可以使用CAS(或者在可能的情况下,无条件地写volatile)修改它。 */ volatile int waitStatus; //等待队列节点的上一个节点 /**原文描述: 链接到当前节点/线程检查waitStatus所依赖的前一个节点。 在进入队列期间分配,仅在退出队列时为空(为了GC)。 此外,当取消一个前任时,我们在寻找一个始终存在的未取消的前任时进行短路,因为头节点永远不会被取消: 只有成功获取节点才会成为头节点。 被取消的线程永远不会成功获取,线程只会取消自己,不会取消任何其他节点。 */ volatile Node prev; //等待队列节点的下一个节点 /**原文描述: 链接到当前节点/线程在释放时取消泊位的后续节点。 在排队时分配,在绕过被取消的前任时调整,在退出队列时为空(为了GC)。 查询操作直到附加之后才会分配前任的下一个字段,因此看到下一个字段为空并不一定意味着节点处于队列的末尾。 然而,如果下一个字段看起来是空的,我们可以从尾部扫描prev来再次检查。 取消节点的下一个字段被设置为指向节点本身而不是null,以使isOnSyncQueue的工作更容易。 */ volatile Node next; //等待队列节点的所属线程 /**原文描述: 将此节点编入队列的线程。施工时初始化,使用后取消。 */ volatile Thread thread; } //队列头结点,其实仅仅是一个指针,会指向队列中的哨兵节点,并非队列中真正的头结点. /** 延迟初始化的等待队列头。 除了初始化,它只能通过方法setHead进行修改。 注意:如果head存在,它的waitStatus保证不会被CANCELLED。 */ private transient volatile Node head; //队列尾结点,其实仅仅是一个指针,会指向队列中的真正的尾节点. /** *等待队列的尾部,延迟初始化。仅通过方法enq修改以添加新的等待节点。 */ private transient volatile Node tail; /** *同步状态。 */ private volatile int state; }
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; //内部类Sync继承AQS /**同步器提供所有实现机制*/ abstract static class Sync extends AbstractQueuedSynchronizer { /** *执行{@link Lock# Lock}。子类化的主要原因是允许非公平版本的快速路径。 */ abstract void lock(); } //非公平锁的同步对象,继承Sync,间接继承AQS. static final class NonfairSync extends Sync {} //公平锁的同步对象,继承Sync,间接继承AQS. static final class FairSync extends Sync {} //默认实例化非公平锁 public ReentrantLock() { sync = new NonfairSync(); } //标识位可切换公平锁还是非公平锁 public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } //内部类Sync调用加锁方法 /** *获取锁。 *如果锁没有被其他线程持有,则获取锁并立即返回,将锁持有计数设置为1。 *如果当前线程已经持有该锁,则持有计数加1,该方法立即返回。 *如果锁被另一个线程持有,那么当前线程将被禁用,并处于休眠状态,直到获得锁,当锁保持计数设置为1时。 */ public void lock() { sync.lock(); } //内部类Sync调用解锁方法 /** *试图释放此锁。 *如果当前线程是这个锁的持有者,那么持有计数递减。 *如果持有计数现在是零,那么锁被释放。 *如果当前线程不是这个锁的持有者,那么{@link IllegalMonitorStateException}将被抛出。 * @throws IllegalMonitorStateException如果当前线程没有持有这个锁。 */ public void unlock() { sync.release(1); } }

源码分析前提条件

锁被真正占用的两个状态:

state=1

ownerThread=currentThread

哨兵节点状态:

thread=null

prev=null

next=null 或非 null(有下一个节点)

关于各种状态和值的简单处理流程(省去了中间环节):

----先设置状态,再阻塞。

----先回溯状态,再解锁,再设置状态。

lock()

1)node0 获取锁成功

设置 state=1,ownerThread=node0

2.1)获取锁失败

设置上一个 node 节点的 waitStatus=-1

2.2)执行 park()阻塞当前节点 node1

2.3)设置下一个线程节点 wait=1,ownerThread=node1.并返回.

2.5)哨兵节点 S1 被 GC 回收,node1 变成哨兵节点 S。

unlock()

1)A0 设置 state=0,ownerThread=null

2)设置上一个 node 节点的 waitStatus=0

3)执行 unpark()唤醒下一个节点,执行 lock()的 2.3)

等待队列是“CLH”(Craig, Landin 和 Hagersten)锁队列的变体。

CLH 锁通常用于自旋锁。

只有成功获取节点才会成为头节点。

被取消的线程永远不会成功获取,线程只会取消自己,不会取消任何其他节点。

下述源码分析中:

哨兵节点简称 S

加入队列的节点/参数称为 nodeX

源码 -业务总体流程案例图

----业务结构流程图

----以非公平锁获取锁失败加入队列的情况为切入点画的流程图。

----以在银行办理业务为源码分析案例。

----这里只是简要说明,描述并不完善,更详细的参考不同方法的描述。

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

案例 demo

import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 可重入锁验证:显示锁ReentrantLock-案例05-非成对加锁解锁-多线程容易死锁 */ public class ReEnterLockDemo06 { static Lock lock = new ReentrantLock(); public static void main(String[] args) throws InterruptedException { new Thread(()->{ //System.out.println(Thread.currentThread().getName() + "\t unlock前无lock测试开始 ."); //lock.unlock(); //System.out.println(Thread.currentThread().getName() + "\t unlock前无lock测试结束 ."); System.out.println(Thread.currentThread().getName() + "\t unlock前无lock测试-会抛异常 ."); lock.lock(); lock.lock(); try { System.out.println(Thread.currentThread().getName() + "\t 外层."); lock.lock(); try { System.out.println(Thread.currentThread().getName() + "\t 内层."); }finally { lock.unlock(); } }finally { lock.unlock(); /** * 这里故意注释,实现加锁次数和释放次数不一样. * 由于加锁次数和释放次数不一样,第二个线程始终无法获取到,导致一直在等待. * 正常情况,加锁几次就要解锁几次. * 此时t1下面再加一个线程,t2就不输出了,t2无法获取锁了,这样就容易导致死锁和生产的一些问题. * --同一个线程可以多次获得属于自己的同一把锁,这样就可以一定程度的避免死锁. */ //lock.unlock(); System.out.println(Thread.currentThread().getName() + "\t 测试同一个线程未成对unlock接着lock是否阻塞-开始."); lock.lock(); System.out.println(Thread.currentThread().getName() + "\t 测试同一个线程未成对unlock接着lock是否阻塞-结束."); } },"t1").start(); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + "\t 开启另外线程测试上一个线程未成对unlock是否能lock开始 ." ); lock.lock(); System.out.println(Thread.currentThread().getName() + "\t 开启另外线程测试上一个线程未成对unlock是否能lock结束 ." ); new Thread(()->{ System.out.println(Thread.currentThread().getName() + "\t 之前线程未成对unlock()这里依然同样阻塞进不来lock."); lock.lock(); try { System.out.println(Thread.currentThread().getName() + "\t 内层."); }finally { lock.unlock(); } },"t1").start(); } } /** 输出结果: t1 unlock前无lock测试-会抛异常 . t1 外层. t1 内层. t1 测试同一个线程未成对unlock接着lock是否阻塞-开始. t1 测试同一个线程未成对unlock接着lock是否阻塞-结束. main 开启另外线程测试上一个线程未成对unlock是否能lock开始 . ...... */

加锁-lock.lock()

方法调用顺序
(以获取不到锁加入队列为例): lock->acquire->tryAcquire->addWaiter(enq-自旋)->acquireQueued(自旋-shouldParkAfterFailedAcquire->parkAndCheckInterrupt)
方法-lock()
#公平锁 所属类:ReentrantLock$FairSync public void lock() { sync.lock(); } final void lock() { acquire(1); }
#非公平锁 所属类:ReentrantLock$NonfairSync final void lock() { //当前线程获取锁成功 //CAS:把state从0改为1,成功则当前线程抢到锁.对应unlock()中compareAndSetState(1, 0). //如果CAS失败,那么有可能是当前线程再次获取锁,或者别的线程在占用锁. if (compareAndSetState(0, 1))//true //把ownerThread设置为currentThread.对应unlock()中setExclusiveOwnerThread(null); //exclusiveOwnerThread独占模式同步的当前所有者。 setExclusiveOwnerThread(Thread.currentThread()); //此时并未创建任何Node对象,只是说AQS有线程占用了AQS锁. else//false //当前线程获取锁失败 //在独占模式下获取锁,忽略中断.失败时恢复正常. //可重入锁也在这里执行 acquire(1); }
方法(M1)-acquire
所属类:AQS //尝试获取锁,如果获取不到就排队(阻塞当前线程) //在独占模式下获取,忽略中断。 //通过至少调用一次{@link #tryAcquire}来实现,成功时返回。 //否则,线程将进入队列,可能会反复阻塞和解除阻塞,调用{@link #tryAcquire}直到成功。 public final void acquire(int arg) { //获取锁失败并且加入队列,则执行线程中断. //tryAcquire返回true抢锁成功(包括当前线程获取到锁和获取可重入锁),则不执行后面的加入队列操作.acquireQueued是真正入队的入口. if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) /** parkAndCheckInterrupt()判断Thread.interrupted()线程是否会挂起. 因为LockSupport.park挂起的线程不仅会被LockSupport.unpark方法唤醒,还会被中断唤醒,所以线程被唤醒后调用了下Thread.interrupted()方法来返回下当前线程的中断状态, 但是该方法会清楚掉线程的中断状态,所以在acquire()又一次的通过selfInterrupt()设置了下中断状态。 这也是ReentrantLock区别于synchronized关键字的一个地方。 ReentrantLock支持等待队列中的线程中断,synchronized不支持。 */ selfInterrupt(); }
方法 (M1-1)-tryAcquire
//tryAcquire()尝试获取锁,导致失败的几种情况: 1)锁已经被其他线程获取. 2)锁没有被其他线程获取,但是需要排队. 3)CAS失败(可能过程中已经有其他线程获取到锁了). 锁为自由状态(state=0),并不能说明可以立即执行CAS获取锁,因为可能在当前线程获取锁之前,已经有其他线程在排队了,必须遵循先来后到的原则获取锁. 所以还要调用hasQueuedPredecessors()查看自己是否需要排队。
公平锁与非公平锁的区别
在此tryAcquire()中,公平锁只比非公平锁多了hasQueuedPredecessors()的判断,判断等待队列中是否存在有效的节点,即判断了是否需要排队,导致公平锁和非公平锁的差异如下: 公平锁:先来先到,线程在获取锁时,如果这个锁的等待队列总已经有线程在等待,那么当前线程就会进入等待队列中。 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象,也就是说队列的第一个排队线程在unpark(),但之后还是需要竞争锁(存在线程竞争的情况)。 再者,公平锁每次获取到锁为同步队列中的第一个节点,保证请求资源时间上的绝对顺序,而非公平锁有可能刚释放锁的线程下次继续获取该锁,则有可能导致其他线程永远无法获取到锁,造成“饥饿”现象。 公平锁为了保证时间上的绝对顺序,需要频繁的上下文切换,而非公平锁会降低一定的上下文切换,降低性能开销。 因此,ReentrantLock默认选择的是非公平锁,则是为了减少一部分上下文切换,保证了系统更大的吞吐量。
非公平锁 -tryAcquire
所属类:AQS$NonfairSync //非公平锁获取锁过程 //抢到锁返回true,表示获取到了锁/需要排队,线程逐级返回,加锁过程结束. //抢不到锁返回false,表示无需排队. protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
非公平锁 - nonfairTryAcquire
所属类:AQS$Sync final boolean nonfairTryAcquire(int acquires) { //当前线程 final Thread current = Thread.currentThread(); //state状态 int c = getState(); //c==0表示锁还未被占用或上个线程已经释放此锁,此线程可直接获取锁,不用加入队列直接加锁. if (c == 0) { //CAS:把state从0改为1,成功则抢到锁.对应unlock()解锁中compareAndSetState(1, 0). if (compareAndSetState(0, acquires)) { //把ownerThread设置为currentThread.对应unlock()解锁中setExclusiveOwnerThread(null); setExclusiveOwnerThread(current); //抢到锁 return true;//即tryAcquire()获取锁成功,但此线程并不入队. } } //current == getExclusiveOwnerThread()表示当前线程和已经被锁的线程对象是同一个, //已经占有锁了,但也是抢到锁了,只是可重复锁,不用加入队列,直接加锁的状态. else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; //锁状态是个int类型,这里应该是担心锁重复的次数太多然后 溢出变成负数 if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //可重入锁:相同线程每次lock()每次state加1,每次unlock(),每次state减1.必须先执行lock()才能执行unlock(). setState(nextc); return true;//即tryAcquire()获取锁成功,但此线程并不入队. } return false;//即tryAcquire()获取锁失败,此线程执行后续的入队操作. }
公平锁 -tryAcquire
所属类:AQS$FairSync //公平锁获取锁的过程,只是比非公平锁多了"!hasQueuedPredecessors()"的逻辑. //其他所有逻辑同非公平锁相同,看非公平锁的nonfairTryAcquire逻辑即可. protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //返回true,表示需要排队 //返回false,表示无需排队 if (!hasQueuedPredecessors() && //无需排队就执行下述两种方法 compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
公平锁 -hasQueuedPredecessors
所属类:AQS //判断等待队列中是否存在有效节点 //返回true,表示需要排队 //返回false,表示无需排队 //注意,由于中断和超时导致的取消可能在任何时候发生,{@code true}返回并不保证其他线程会在当前线程之前获得。 //有几下几种情况需要考虑: 1)未初始化: 1.1)下述的tail、head都为null,此时还未建立队列,h != t返回false,此时表示无需排队。整个hasQueuedPredecessors()方法返回false. 2)已初始化(表示下述的tail、head都不为null): 2.1)head等于tail,那么h != t返回false,此时虽然已建立队列,但只初始化了哨兵节点,后续节点还未真正入队。此时表示无需排队,整个hasQueuedPredecessors()方法返回false. 2.2)head不等于tail,那么h != t返回true,还分下述两种情况: 2.2.1)head的next节点等于null,即(s = h.next) == null返回true。注意此时2.2.2)的流程就不会再执行。 此时虽然已建立队列,但现在可能在入队/出队的过程中。(要注意的是head和tail并不是同时初始化的,参考addWaiter()和acquireQueued()) 此时表示无需排队,整个hasQueuedPredecessors()方法返回false. 2.2.2)head的next节点不等于null,即(s = h.next) == null返回false,表示已建立等待队列并且已有其他线程加入等待队列,继续分下述两种情况: 2.2.2.1)head的next节点的线程等于当前执行线程,即s.thread != Thread.currentThread()返回false。此时表示无需排队,整个hasQueuedPredecessors()方法返回false. 2.2.2.2)head的next节点的线程不等于当前执行线程,即s.thread != Thread.currentThread()返回true。此时表示需要排队,整个hasQueuedPredecessors()方法返回true. public final boolean hasQueuedPredecessors() { //它的正确性取决于head在tail之前被初始化,并且如果当前线程在队列中处于首位,则head.next是准确的。 Node t = tail; // 以反向初始化顺序读取字段 Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }
方法 (M1-2)-addWaiter
所属类:AQS //仅用于设置队列节点的位置,并非真正的加入到队列(在acquireQueued中设置waitStatus为-1和park()阻塞线程才是真正加入队列) //通过CAS和字段保证一定会加入成功. private Node addWaiter(Node mode) { //要注意这个node对象为EXCLUSIVE独占模式,而哨兵节点S为SHARED共享模式(有且仅在初始建立队列时的哨兵节点成立,之后迭代的哨兵节点都是EXCLUSIVE独占模式). Node node = new Node(Thread.currentThread(), mode); //第二部分(初始化队列在第一部分):队列中已存在线程锁对象,后续再加入的执行部分. Node pred = tail; if (pred != null) { //此时的内容参考下述"第一部分"中的"多个节点案例"即可. node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; //返回的是最新加入队列的节点. return node; } } //第一部分:初始化队列,即加入第一个元素(包括哨兵节点和第一个加入队列的线程锁对象) enq(node); //返回的是最新加入队列的节点,这个node节点在enq()中并未处理. return node; }
方法 (M1-2-1)-enq
所属类:AQS 上面第一部分:调用enq方法初始化等待队列并通过cas及自旋保证成功加入到等待队列队尾. //暂且把node当成第一个队列中的元素nodeX,哨兵节点除外.注意这个node参数节点为EXCLUSIVE独占模式. private Node enq(final Node node) { //自旋锁处理方式 for (;;) { Node t = tail; //第一次初始化(当队列为空时)(自旋第一次进入),仅处理哨兵节点,未处理真正将要加入队列中的线程锁对象nodeX. if (t == null) { // Must initialize //CAS:基于当前对象把head的值设置成S //设置哨兵节点S,实例化新node对象S(注意S对象标记为SHARED共享模式),而队列里的其他节点为EXCLUSIVE独占模式. if (compareAndSetHead(new Node())) //把head头结点和tail尾节点都指向S. //一旦设置head,那么head永远指向哨兵节点S(即使哨兵节点S会改变),tail会随着队列中加入的线程越来越多,一直往后移动直至指向最后一个线程节点. tail = head; } else { //第二次(自旋第二次进入,并返回),处理真正将要加入队列中的线程锁对象nodeX. //把nodeX的上一个节点prev指向tail尾节点 //暂且把node当成第一个队列中的元素nodeX,哨兵节点除外.注意这个node参数节点为EXCLUSIVE独占模式. node.prev = t; //CAS:基于当前对象把tail尾节点的值由tail尾节点的Node对象设置成nodeX对象. //成功,第n次加入成功把当前线程节点加入到等待队列. //失败,加入到等待队列尾部的时候,有别的线程已经加入到等待队列了,再次自旋. if (compareAndSetTail(t, node)) { //设置tail尾节点的下一个节点next指向nodeX对象. t.next = node; //返回nodeX的上一个节点prev对象. //而这个enq()返回的t对象未在上述的addWaiter()中用到,只是用到了这个参数nodeX对象. return t; } } } }
方法 (M1-2-图)-addWaiter/enq

----也可参考”源码-业务总体流程案例图”章节。

image.png

image.png

image.png

image.png

AQS中有两个属性head和tail,分别用来保存AQS队列的头尾节点。 初始时,这两个属性都为null,当有一个线程进来时,队列如上。 上述队列表示,在节点nodeA插入队列之前,没有其他节点在排队。 注意,当前持有锁的线程,永远不会处于排队队列中。这是因为,当一个线程调用lock()获取锁时,只有两种情况: --1)锁处于自由状态(state==0),且它不需要排队。通过CAS立即获取到了锁,这种情况,显然它不会处于排队队列中。 --2)锁处于非自由状态,线程加入到了排队队列的队头(不包括head)等待锁。 将来某一时刻,锁被其他线程释放,并唤醒这个队列中的线程,线程唤醒后,执行tryAcquire,获取到了锁,然后重新维护排队队列,将自己从队列移除(acquireQueued()).

image.png

image.png

方法 (M1-3)-acquireQueued
所属类:AQS //线程真正加入CLH队列、队列重置并返回线程中断标识interrupted. //参数:node-addWaiter()返回的尾节点指向的最后一个加入的nodeX(Node.EXCLUSIVE)对象 //参数:arg-1 //注意和doAcquireInterruptibly()的对比,二者主要区别在于发现线程被中断过后的处理逻辑. final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { //线程中断状态 boolean interrupted = false; //注意:自旋一直跑着呢 for (;;) { //获取nodeX的上一个节点prev final Node p = node.predecessor(); //判断是否为head头节点对象,并通过当前线程再次获取一次锁(回看tryAcquire) //如果此时只有哨兵节点和一个后续的nodeX节点,即只有首节点(不包括哨兵节点),tryAcquire成功,那么直接进入此if. //这里可能会出现,比如nodeX刚释放了锁又再次抢到锁的情况. if (p == head && tryAcquire(arg)) { //移除当前哨兵节点S并GC回收,并把nodeA设置成哨兵节点S,nodeA后续的下一个节点next指向nodeX不变. //要注意经tryAcquire()处理后,此时的state=1,ownerThread=nodeA,nodeA就从此队列消失了,从而绑定了lock锁对象. setHead(node); //可以把这个GC回收的原哨兵节点S对象当成执行unlock()时处理的node对象. //前提是,有且仅当unlock()的线程作队列的节点中生效(存在CLH队列时生效)(因为第一次就获取锁成功时并不存在CLH队列,当然也不会存在各种node节点指向) p.next = null; // help GC failed = false; //注意:这里要返回,返回当前线程的中断状态,用于后续操作判断当前线程的是怎么被唤醒的. return interrupted; } //获取锁失败后设置当前节点nodeX的上一个节点prev的waitStatue为-1(包括哨兵节点)(-1表示节点被阻塞了) //只有发现当前节点不是首节点才会返回true if (shouldParkAfterFailedAcquire(p, node) && //park()阻塞当前线程并在unpark()唤醒时返回当前线程的中断状态. //要注意的是:最终唤醒的一定是队头(并非哨兵节点,即一定是哨兵节点的后续next节点),被唤醒后再次自旋进入最上面的if()处理新的队列逻辑. //这里不用考虑多次park()或unpark()的问题,因为这个LockSupport加锁解锁的许可证最大是1. parkAndCheckInterrupt()){ //该线程是被中断唤醒并获取的锁,于辅助后续操作判断当前线程是被中断唤醒的.所以需要重新设置下中断位. interrupted = true; } } } finally { //注意这里最终都会执行别忘记了 if (failed){ //如果该方法因为某些特殊情况意外的退出(没有获取锁就退出了),那么就取消尝试获取锁. //设置节点状态为CANCELLED cancelAcquire(node); } } }
方法 (M1-3 图)-acquireQueued

----也可参考”源码-业务总体流程案例图”章节。

image.png

image.png

方法 (M1-3-1)-setHead
所属类:AQS //移除当前哨兵节点S并GC回收,设置nodeX为新的哨兵节点S. private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
方法 (M1-3-2)- predecessor
所属类:AQS$Node //获取nodeX的上一个节点prev(可能会跟在addWaiter()中设置的有出入,即期间有节点位置变化) final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; }
方法 (M1-3-3)-shouldParkAfterFailedAcquire
所属类:AQS //获取锁失败后处理当前节点nodex的上一个节点prev的waitStatue为-1(-1表示节点被阻塞了) //在for自旋中. //pred-nodeX的前置节点,node-nodeX. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; //判断当前节点nodex的上一个节点prev的waitStatue=-1 if (ws == Node.SIGNAL) return true; if (ws > 0) { //ws==1,即CANCELLED,表示pred节点已经被取消了,将取消的pred节点从队列中移除,重新维护下排队链表关系. do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); //上一个节点的下一个节点=当前节点 pred.next = node; } else { //程序第一次执行到这里返回false,还会进行外层第二次循环. //设置当前节点nodex的上一个节点prev的waitStatue=-1(包括哨兵节点).(-1表示节点被阻塞了) //没有condition的情况下,且是第一个加入到等待列表.如果在这期间头结点没有变的话,就把当前节点waitStatus设置为-1状态. //为何每个线程进入该方法后,修改的是上一个节点的waitStatus,而不是自己修改自己的? //因为线程调用park()后,无法设置自己的这个状态,若在调用park()前设置,存在不一致的问题,所以每个node的waitStatus,在后继节点加入时设置. compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
方法 (M1-3-3 图)-shouldParkAfterFailedAcquire

----也可参考”源码-业务总体流程案例图”章节。

image.png

方法 (M1-3-4)-parkAndCheckInterrupt
所属类:AQS //真正加入CLH队列.park()阻塞当前线程并在unpark()唤醒时返回当前线程的中断状态. private final boolean parkAndCheckInterrupt() { //park被唤醒的条件有:1.线程调用了unpark,2:其它线程中断了线程;3:发生了不可预料的事情 LockSupport.park(this); //有两种情况,会导致阻塞结束: //1)持有锁的线程,释放锁后,将这个线程unpark()了.要注意的是:最终唤醒的一定是队头(并非哨兵节点),被唤醒后再次自旋处理. //2)线程被interrupt().(注意,在外部interrupt这个线程,不是抛出InterruptException,这一点和sleep及wait都不同) /** parkAndCheckInterrupt()判断Thread.interrupted()线程是否会挂起. 因为LockSupport.park挂起的线程不仅会被LockSupport.unpark方法唤醒,还会被中断唤醒,所以线程被唤醒后调用了下Thread.interrupted()方法来返回下当前线程的中断状态, 但是该方法会清楚掉线程的中断状态,所以在acquire()又一次的通过selfInterrupt()设置了下中断状态。 这也是ReentrantLock区别于synchronized关键字的一个地方。 ReentrantLock支持等待队列中的线程中断,synchronized不支持。 */ return Thread.interrupted(); }

解锁-lock.unlock()

方法调用顺序

unlock->release->tryRelease->unparkSuccessor

方法 -unlock()

image.png

所属类:ReentrantLock //释放锁不区分非公平锁和公平锁 public void unlock() { sync.release(1); }
方法(M2)-release
所属类:AQS //释放当前线程的锁 public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0){ //当unlock()的线程作为队列的节点中生效(存在CLH队列时生效):唤醒当前线程的下一个线程节点 unparkSuccessor(h); } return true; } return false; }
方法 (M2-1)-tryRelease
所属类:ReentrantLock$Sync //释放当前线程的锁 //release:释放数量 protected final boolean tryRelease(int releases) { int c = getState() - releases; //当前执行unlock()的线程一定要和持有锁的线程是同一个才可以释放锁. if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //c==0,直接释放锁,否则当前线程直接释放此线程可重入的数量. if (c == 0) { free = true; //设置当前线程持有的锁线程为null //要注意的是,持有锁的对象就是Lock本身,无论多少个线程来抢都不用关系这个Lock对象的GC回收问题. setExclusiveOwnerThread(null); } //设置当前线程持有的状态,0表示可以唤醒队列中其他线程,否则释放可重入的数量. setState(c); return free; }
方法 (M2-2)-unparkSuccessor
所属类:AQS //唤醒当前线程的下一个线程节点 private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0){ //如果当前线程waitStatus=-1,表示此节点是阻塞节点线程,那么CAS设置当前线程的waitStatus为0 //在释放下个线程之前先把自己当前线程的waitStatus设置成0 compareAndSetWaitStatus(node, ws, 0); } Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //unpark()释放当前线程的下一个节点线程 //当unlock()的线程作为队列的节点中生效(存在CLH队列时生效)(因为第一次就获取锁成功时并不存在CLH队列,当然也不会存在各种node节点指向) //可以把这个unlock()的当前线程理解成CLH队列中的哨兵节点S,在acquireQueued()中被GC回收了,否则当前unlock()的线程一直存在. LockSupport.unpark(s.thread); }
  • JUC
    17 引用 • 3 回帖 • 1 关注
  • AQS
    3 引用
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3196 引用 • 8215 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...