ReentrantLock#lock
public final void acquire(int arg) { //尝试获取锁,如果获取锁失败则尝试添加到同步队列中 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
ReentrantLock.FairSync#tryAcquire
尝试获取锁,如果当前线程是 head 的后继节点并且 head 线程已经释放锁的时候,根据队列取头部节点为独占线程
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); //获取状态 int c = getState(); if (c == 0) { //如果队列为空或者队列不为空时当前线程是head的后继节点,并且设置status acquies成功 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //设置当前线程独占 setExclusiveOwnerThread(current); return true; } } //锁的重入,state不为0说明当前线程已经占有锁再次请求锁 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); //设置status setState(nextc); return true; } //否则返回false,尝试加锁失败,将会尝试加入队列 return false; } }
AbstractQueuedSynchronizer#acquireQueued
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { //从等待队列中获取头节点才会尝试获取锁 final Node p = node.predecessor(); //如果pre是头节点,并且尝试获取锁成功,说明加入队列失败 if (p == head && tryAcquire(arg)) { //移除头节点并且将当前node设置为头节点 setHead(node); p.next = null; // help GC failed = false; return interrupted; } //不是头节点,或者尝试获取锁失败 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ //如果pred ws为等待一个释放信号去唤醒,说明node线程也可以安全的park return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ //prenode ws 已经取消,向上获取ws>0的node do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ //将preNode ws设置从ws设置为SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
ReentrantLock#tryLock(long, java.util.concurrent.TimeUnit)
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toNanos(timeout)); }
AbstractQueuedSynchronizer#tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //先去强势获取锁,失败后尝试在nanos内重试 return tryAcquire(arg) || doAcquireNanos(arg, nanosTimeout); }
AbstractQueuedSynchronizer#doAcquireNanos
/** * Acquires in exclusive timed mode. * * @param arg the acquire argument * @param nanosTimeout max wait time * @return {@code true} if acquired */ private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { //若predecessor是头节点并且当前线程尝试获取锁成功设置当前node为头节点,返回获取锁成功 setHead(node); p.next = null; // help GC failed = false; return true; } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) //超时返回false return false; //见AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
ReentrantLock#unlock
/** * Attempts to release this lock. * * <p>If the current thread is the holder of this lock then the hold * count is decremented. If the hold count is now zero then the lock * is released. If the current thread is not the holder of this * lock then {@link IllegalMonitorStateException} is thrown. * * @throws IllegalMonitorStateException if the current thread does not * hold this lock */ public void unlock() { sync.release(1); }
AbstractQueuedSynchronizer#release
public final boolean release(int arg) { //尝试释放锁 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒头部节点 unparkSuccessor(h); return true; } return false; }
AbstractQueuedSynchronizer#unparkSuccessor
/** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒线程 LockSupport.unpark(s.thread); }
欢迎来到这里!
我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。
注册 关于