记一次 Reentrantlock 深入学习

本贴最后更新于 1939 天前,其中的信息可能已经时移世改

初始化

Reentrantlock 可以初始化为公平锁和非公平锁

公平锁

公平锁指能让等待时间长的线程首先获得锁。可以防止线程饥饿。

非公平锁

指随机分配锁的使用权。其效率更高。

在创建 ReentrantLock 的时候通过传进参数 true 创建公平锁,如果传入的是 false 或没传参数则创建的是非公平锁

/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }

加锁(LOCK)

公平锁

final void lock() { acquire(1); }

非公平锁

/** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); }

非公平锁可以立即通过用 CAS 去尝试修改 state,修改成功立即获得锁使用权。否则去执行 acquire 方法。
state:是关键性的变量,它标识了锁的状态,0 是空闲,大于或者等于 1 表示有线程占用了该锁。因为是可重入的锁,所以同一线程可以多次获取该锁,并且每次获取 state+1。当 state 为 0 时其他线程才获取到该锁。

AQS 的 acquire 方法

public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }

当 tryAcquire 方法尝试获取锁成功时直接返回。意味着该线程获得了锁,业务代码继续执行。当 tryAcquire 尝试获得锁失败时。将该线程加入线程队列中,并中断该线程。

tryAcquire 方法

公平锁实现

protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }

获得当前线程,判断 state 是否为 0 如果为 0 则判断(hasQueuedPredecessors 方法)当前线程是否需要排队(不需要排队和 cas 修改成功直接返回 true)。当需要排队和 cas 修改不成功时。判断当前线程是否是锁持有线程,如果是锁持有线程则 state+1(返回 true),否则返回 false;

hasQueuedPredecessors 方法

public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }

当初始化线程队列时会实例化一个 node(该 node 的 thread 属性为 null)并且队列的 tail 和 head(头尾)都会引用该 node,所以当 h!=t 为 true 表示有等待线程,false 表示没有等待线程。因为队列头部的 node 的 thread 永远都是 null(后面会粘代码),所以 h.next 就是最先等待获取锁的线程。所以该方法判断如果 头 node 和尾 node 引用统一为 node 或者 h.next 不为空且 h.next 就是当前线程时不需要排队。(笔者对后面判断理解尚有不足)

非公平锁实现

protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }

非公平锁与公平锁雷同,但是非公平锁不需要排队。

addWaiter

private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }

传入的 mode 为 Node.EXCLUSIVE 表示实例化的 node 为排他的 node,其实该方法就是将该线程所处 node 插入队列尾部。

enq 方法

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

如果队列没有初始化则先将队列初始化,初始化后再将 node 插入队列。(thread 属性永远都是插入到队尾,所以初始化的头部节点 thread 属性永远都是为 null);

acquireQueued 方法

final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; //死循环,正常情况下线程只有获得锁才能跳出循环 for (;;) { final Node p = node.predecessor();//获得当前线程所在结点的前驱结点 //第一个if分句 if (p == head && tryAcquire(arg)) { setHead(node); //将当前结点设置为队列头结点 p.next = null; // help GC failed = false; return interrupted;//正常情况下死循环唯一的出口 } //第二个if分句 if (shouldParkAfterFailedAcquire(p, node) && //判断是否要阻塞当前线程 parkAndCheckInterrupt()) //阻塞当前线程 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }

这段代码主要的内容都在 for 循环中,这是一个死循环,主要有两个 if 分句构成。第一个 if 分句中,当前线程首先会判断前驱结点是否是头结点,如果是则尝试获取锁,获取锁成功则会设置当前结点为头结点(更新头指针)。为什么必须前驱结点为头结点才尝试去获取锁?因为头结点表示当前正占有锁的线程,正常情况下该线程释放锁后会通知后面结点中阻塞的线程,阻塞线程被唤醒后去获取锁,这是我们希望看到的。然而还有一种情况,就是前驱结点取消了等待,此时当前线程也会被唤醒,这时候就不应该去获取锁,而是往前回溯一直找到一个没有取消等待的结点,然后将自身连接在它后面。一旦我们成功获取了锁并成功将自身设置为头结点,就会跳出 for 循环。否则就会执行第二个 if 分句:确保前驱结点的状态为 SIGNAL,然后阻塞当前线程。

先来看 shouldParkAfterFailedAcquire(p, node),从方法名上我们可以大概猜出这是判断是否要阻塞当前线程的,方法内容如下

/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //状态为SIGNAL /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { //状态为CANCELLED, /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //状态为初始化状态(ReentrentLock语境下) /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }

可以看到针对前驱结点 pred 的状态会进行不同的处理

  • 1.pred 状态为 SIGNAL,则返回 true,表示要阻塞当前线程。
  • 2.pred 状态为 CANCELLED,则一直往队列头部回溯直到找到一个状态不为 CANCELLED 的结点,将当前节点 node 挂在这个结点的后面。
  • 3.pred 的状态为初始化状态,此时通过 compareAndSetWaitStatus(pred, ws, Node.SIGNAL)方法将 pred 的状态改为 SIGNAL。

其实这个方法的含义很简单,就是确保当前结点的前驱结点的状态为 SIGNAL,SIGNAL 意味着线程释放锁后会唤醒后面阻塞的线程。毕竟,只有确保能够被唤醒,当前线程才能放心的阻塞。

但是要注意只有在前驱结点已经是 SIGNAL 状态后才会执行后面的方法立即阻塞,对应上面的第一种情况。其他两种情况则因为返回 false 而重新执行一遍
for 循环。这种延迟阻塞其实也是一种高并发场景下的优化,试想我如果在重新执行循环的时候成功获取了锁,是不是线程阻塞唤醒的开销就省了呢?

最后我们来看看阻塞线程的方法 parkAndCheckInterrupt

shouldParkAfterFailedAcquire 返回 true 表示应该阻塞当前线程,则会执行 parkAndCheckInterrupt 方法,这个方法比较简单,底层调用了 LockSupport 来阻塞当前线程,源码如下:

/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

该方法内部通过调用 LockSupport 的 park 方法来阻塞当前线程。

下面通过一张流程图来说明线程从加入同步队列到成功获取锁的过程
1422237201808051842424381630878927.png

概括的说,线程在同步队列中会尝试获取锁,失败则被阻塞,被唤醒后会不停的重复这个过程,直到线程真正持有了锁,并将自身结点置于队列头部。

解锁(unlock)

public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }

尝试解锁,成功后如果头部 node 不为空且 waitState 状态为 0 则唤醒该线程;

tryRelease 方法

protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }

如果当前线程不是持有锁的线程则抛出 IllegalMonitorStateException 异常。判断 state-releases 如果为 0 则设置当前占用锁的线程为空。不为 0 则表示该锁被线程重入了,只是设置 state 值。最后返回锁的状态。

unparkSuccessor 方法

private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }

这里涉及到 waitStatus 的一个知识,从百度上粘了一部分来,原文地址:
https://www.jianshu.com/p/c021f144a565

waitStatus

volatile int waitStatus;

状态属性,只有如下值:
① SIGNAL:
static final int SIGNAL = -1;
这个节点的后继(或者即将被阻塞)被阻塞(通过 park 阻塞)了,所以当前节点需要唤醒它的后继当它被释放或者取消时。为了避免竞争,获取方法必须首先表示他们需要一个通知信号,然后再原子性的尝试获取锁,如果失败,则阻塞。
也就是说,在获取锁的操作中,需要确保当前 node 的 preNode 的 waitStatus 状态值为’SIGNAL’,才可以被阻塞,当获取锁失败时。(『shouldParkAfterFailedAcquire』方法的用意就是这)
② CANCELLED:
static final int CANCELLED = 1;
这个节点由于超时或中断被取消了。节点不会离开(改变)这个状态。尤其,一个被取消的线程不再会被阻塞了。
③ CONDITION:
static final int CONDITION = -2;
这个节点当前在一个条件队列中。它将不会被用于当做一个同步队列的节点直到它被转移到同步队列中,转移的同时状态值(waitStatus)将会被设置为 0。(这里使用这个值将不会做任何事情与该字段其他值对比,只是为了简化机制)。
④ PROPAGATE:
static final int PROPAGATE = -3;
一个 releaseShared 操作必须被广播给其他节点。(只有头节点的)该值会在 doReleaseShared 方法中被设置去确保持续的广播,即便其他操作的介入。
⑤ 0:不是上面的值的情况。
这个值使用数值排列以简化使用。非负的值表示该节点不需要信号(通知)。因此,大部分代码不需要去检查这个特殊的值,只是为了标识。
对于常规的节点该字段会被初始化为 0,竞争节点该值为 CONDITION。这个值使用 CAS 修改(或者可能的话,无竞争的 volatile 写)。

该方法用于唤醒后继节点,如果存在的话
① 如果状态值是负数,则在预期发信号通知时清除这个负数状态值。如果状态被等待的线程修改了或者清除负数状态值失败是允许。
② 后继节点的线程被唤醒,后继节点通常就是下一个节点。但是如果下一个节点被取消了或者下一个节点为 null,则从队列尾(tail)往前遍历去找真实的未取消的后继节点。
『(s == null || s.waitStatus > 0)』:说明下一个节点为 null 或被取消了(waitStatus 允许的状态值中,只有’CANCELLED’是 >0 的)。那么,就从队列尾(tail)开始向前遍历,获取第一个非空且未被取消的节点。如果存在这样的一个后继节点的话(即,“s != null”),则执行『LockSupport.unpark(s.thread);』操作来唤醒这个节点的线程,此时等待队列中第一个等待的线程就会被重新启动,流程会回到『acquireQueued』方法,该线程会重新重试获取该锁,如果成功 acquireQueued 方法返回,否则线程会再次被挂起,等待下次唤醒后再去再去竞争获取锁。

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3194 引用 • 8214 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...