详解两种 Java 锁 synchronized ReentrantLock(AQS)

野生程序员                流年,长短皆逝 浮生,往来皆客。   本文由博客端 http://www.yscxy.net 主动推送

一、synchronized

(推荐阅读 -死磕 Synchronized

(推荐阅读 -ynchronized 原理底层

1.1 三种应用形式

// 关键字在实例方法上,锁为当前实例
public synchronized void instanceLock() {
    // code
}

// 关键字在静态方法上,锁为当前Class对象
public static synchronized void classLock() {
    // code
}

// 关键字在代码块上,锁为括号里面的对象
public void blockLock() {
    Object o = new Object();
    synchronized (o) {
        // code
    }
}

1.2 锁的几种状态

jdk1.6 以前 ,synchronized 是一个重量级锁

  1. 无锁状态
  2. 偏向锁状态
  3. 轻量级锁状态
  4. 重量级锁状态

1.3 对象头

运行时元数据**
哈希值(HashCode),可以看作是堆中对象的地址
GC 分代年龄(年龄计数器) (用于新生代 from/to 区晋升老年代的标准, 阈值为 15)
锁状态标志 (用于 JDK1.6 对 synchronized 的优化 -> 轻量级锁)
线程持有的锁
偏向线程 ID (用于 JDK1.6 对 synchronized 的优化 -> 偏向锁)
**偏向时间戳

Java 的锁都是基于对象的,锁的信息都是储存到对象头里面的(这里分为两种情况)

如果是普通对象——>2 个字宽储存对象头( markWord、Klass Word)

如果是 数组类型——>3 个字宽储存对象头(主要是多了一个数组的长度)( markWord、Klass Word、arrayLength)

32 位处理器一个字宽 32 位,64 位处理器,一个字宽就是 64 位

表:

长度 内容 说明
32/64bit Mark Word 存储对象的 hashCode 或锁信息等
32/64bit Class Metadata Address 存储到对象类型数据的指针
32/64bit Array length 数组的长度(如果是数组)

我们主要来看看 Mark Word 的格式:

锁状态 29 bit 或 61 bit 1 bit 是否是偏向锁? 2 bit 锁标志位
无锁 0 01
偏向锁 线程 ID 1 01
轻量级锁 指向栈中锁记录的指针 此时这一位不用于标识偏向锁 00
重量级锁 指向互斥量(重量级锁)的指针 此时这一位不用于标识偏向锁 10
GC 标记 此时这一位不用于标识偏向锁 11

总结:当对象状态为偏向锁时,Mark Word 存储的是偏向的线程 ID;当状态为轻量级锁时,Mark Word 存储的是指向线程栈中 Lock Record 的指针;当状态为重量级锁时,Mark Word 为指向堆中的 monitor 对象的指针。

image-20210513215045438

1.4 锁的对比

偏向锁 加锁和解锁不需要额外的消耗,和执行非同步方法相比仅存在纳秒级的差距 如果线程间存在锁竞争,会带来额外的锁撤销的消耗 适用于只有一个线程访问同步块场景
轻量级锁 竞争的线程不会阻塞,提高了程序的响应速度 如果始终得不到锁竞争的线程,使用自旋会消耗 CPU 追求响应速度,同步块执行速度非常快
重量级锁 线程竞争不使用自旋,不会消耗 CPU 线程阻塞,响应时间缓慢 追求吞吐量,同步块执行速度较慢

1.5 偏向锁

1.5.1、应用场景

经过大量的实验得出结论,大部分加锁的代码,很大一部分时间只有一个线程运行,所以并不是每次执行该段代码都需要借助操作系统进行加锁 ,只是设置了一个标志位,Mark Word 中的线程 id

1.6、轻量级锁

1.6.1 升级过程

当新加入的线程发现被锁的对象头里面不是自己的线程 id,那么久会直接升级为轻量级锁

1.6.2 轻量级锁的具体方案

在轻量级锁状态下,当前线程会在栈帧下创建一个 LockRecord ,LockRecord 会把 MarkWord 的信息拷贝进去,并且有个 Owner 指针指向加锁的对象(如下图 👇)

1583755888236

如果这个时候出现另外 线程进行竞争锁,那么会出现三种情况

情况一(其他线程来竞争,cas 没有超过 10 次获取到锁)
情况二(其他线程来竞争,cas 超过 10 次没有获取到锁——此时升级为重量级锁)
情况三(该线程本身来再次此执行 Synchronized 代码块)那么会再添加一条 LockRecord 作为重入锁的计数器(每次获得锁都添加一个 Lock Record 来表示锁的重入如下图 👇)

1583756190177

1.6.3、解锁过程

线程退出synchronized代码块 的时候,如果获取的是 取值为 null 的锁记录,表示有 锁重入,这时重置锁记录,表示重入计数减一

轻量级锁解锁时,会使用 CAS 将之前复制在栈桢中的 Displaced Mard Word 替换回 Mark Word 中。如果替换成功,则说明整个过程都成功执行,期间没有其他线程访问同步代码块。

但如果替换失败了,表示当前线程在执行同步代码块期间,有其他线程也在访问,当前锁资源是存在竞争的,那么锁将会膨胀成重量级锁

1.5 重量级锁

它加锁就是依赖底层操作系统的 mutex 相关指令实现, 所以会造成 用户态和内核态之间的切换, 非常耗性能 !

1.5.1 触发情况

当对象的锁为轻量级锁的时候,出现锁的竞争并且多次 CAS 尝试获取锁失败

1、当 Thread2 访问到 synchronized(obj)中的共享资源的时候

首先会将 synchronized 中的锁对象中对象头的 MarkWord 去尝试指向操作系统的 Monitor 对象(每一个对象对应的都有一个). 让锁对象中的 MarkWord 和 Monitor 对象相关联. 如果关联成功, 将 obj 对象头中的 MarkWord 的对象状态从 01 改为 10。**
**因为 Monitor 没有和其他的 obj 的 MarkWord 相关联, 所以 Thread2 就成为了该 Monitor 的 Owner(所有者)。

**2、又来了个 Thread1 执行 synchronized(obj)代码, **

它首先会看看能不能执行该临界区的代码; 它会检查 obj 是否关联了 Montior, 此时已经有关联了, 它就会去看看该 Montior 有没有所有者(Owner), 发现有所有者了(Thread2); Thread1 也会和该 Monitor 关联, 该线程就会进入到它的 EntryList(阻塞队列);

**3、当 Thread2 执行完临界区代码后, **

Monitor 的 Owner(所有者)就空出来了. 此时就会通知 Monitor 中的 EntryList 阻塞队列中的线程, 这些线程通过竞争, 成为新的所有者

在这里插入图片描述

—————————————————————————————————————————————

1583652360228

图中 WaitSet 中的 Thread-0,Thread-1 是之前获得过锁,但条件不满足进入 WAITING 状态的线程,后面讲 wait-notify 时会分析

二、乐观锁-悲观锁

乐观锁:

乐观锁又称为“无锁”,顾名思义,它是乐观派。乐观锁总是假设对共享资源的访问没有冲突,线程可以不停地执行,无需加锁也无需等待。而一旦多个线程发生冲突,乐观锁通常是使用一种称为 CAS 的技术来保证线程执行的安全性。

悲观锁:

悲观锁就是我们常说的锁。对于悲观锁来说,它总是认为每次访问共享资源时会发生冲突,所以必须对每次数据操作加上锁,以保证临界区的程序同一时间只能有一个线程在执行。

三、什么是 CAS(Compare And Swap)

推荐阅读——详解 CAS

CAS 机制中使用了三个基本操作数:内存地址 V,旧的预期值 A,需要修改的新值 B

更新一个变量的时候只有当内存值 V 中的实际值 和 变量的预期值 A 相同时,才会将内存地址 V 对应的值修改为 B

package net.yscyx.lk_3.syn_lock;

import java.util.concurrent.atomic.AtomicInteger;

public class CAS {
    static int a = 0;
    public static void main(String[] args) throws InterruptedException {
        testUnSafe();
        testAtomic();
    }

    //线程不安全的情况
    public static void testUnSafe() throws InterruptedException {
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                a++;
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                a--;
            }
        });
        t1.start();
        t2.start();
        //让父线程等待子线程结束之后才能继续运行。
        t1.join();
        t2.join();
        System.out.println(a);
        System.out.println(t2.isAlive());
        System.out.println(t1.isAlive());
    }

    //原子类
    public static void testAtomic() throws InterruptedException {
        AtomicInteger atomicInteger = new AtomicInteger();
        Thread t1 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                atomicInteger.incrementAndGet();
            }
        });
        Thread t2 = new Thread(() -> {
            for (int i = 0; i < 100000; i++) {
                atomicInteger.decrementAndGet();
            }
        });
        t1.start();
        t2.start();
        System.out.println(t2.isAlive());
        System.out.println(t1.isAlive());
        System.out.println(atomicInteger.get());
    }
}

三、ReentrantLock

Java 除了使用 Synchronized 外,还可以使用 ReentrantLock 实现独占锁功能,并且 ReentrantLock 相比于 Synchronized 而言功能更加丰富,且使用起来更为灵活,也更适合复杂的并发场所。

3.1、ReentrantLock 和 Synchronized 对比
3.2、公平锁——非公平锁

公平锁:谁等的时间长那么久谁先获取锁

非公平锁:非公平锁那就随机的获取,谁运气好,cpu 时间片轮到哪个线程

3.3、响应中断

含义:一个线程获取不到锁,不会一直 等下去。ReentrantLock 会给与一个中断回应

**3.4、interrupt、interrupted 、**isInterrupted
  1. interrupt** : 中断此线程,此线程不一定是当前线程,儿是值调用该方法的 Thread 实例所代表的线程例如 👇**
     public static void main(String[] args) {
            Thread thread1 = new Thread(()->>{
                ....
            });
            thread1.start();
            thread1.interrupt();
        }
    
  2. interrupted** :测试当前线程是否被中断 ,返回一个 boolean 并清除中断状态,第二次再次调用的时候中断状态已经被清除,返回一个 false。**
  3. isinterrupted** : 只测试此线程是否被中断,不清除中断状态。**
3.5、lock、tryLock 与 lockInterruptibly 的区别
**lock **

优先考虑获取锁,待获取锁成功后,才响应中断。

**lockInterruptibly **

优先考虑响应中断,而不是响应锁的普通获取或重入获取。

详细区别:**
**ReentrantLock.lockInterruptibly 允许在等待时由其它线程调用等待线程的 Thread.interrupt 方法来中断等待线程的等待而直接返回,这时不用获取锁,而会抛出一个 InterruptedException。 ReentrantLock.lock 方法不允许 Thread.interrupt 中断,即使检测到 Thread.isInterrupted,一样会继续尝试获取锁,失败则继续休眠。只是在最后获取锁成功后再把当前线程置为 interrupted 状态,然后再中断线程。

tryLock

分三种情况进行讨论

  • 获取到锁
  • 在指定时间内获取到锁
  • 在指定时间内没有获取到锁
3.6、打扰机制
  1. 线程在 sleep 或 wait,join, 此时如果别的进程调用此进程的 interrupt()方法,此线程会被唤醒并被要求处理 InterruptedException;(thread 在做 IO 操作时也可能有类似行为,见 java thread api)
  2. 此线程在运行中, 则不会收到提醒。但是 此线程的 “打扰标志”会被设置, 可以通过 isInterrupted()查看并 作出处理。

lockInterruptibly()和上面的第一种情况是一样的, 线程在请求 lock 并被阻塞时,如果被 interrupt,则“此线程会被唤醒并被要求处理 InterruptedException”。并且如果线程已经被 interrupt,再使用 lockInterruptibly 的时候,此线程也会被要求处理 interruptedException

3.7、线程之间的通信

wait、notifyAll、notify

3.8、生产者/消费者模式
package net.yscyx.lk_3.syn_lock;

import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 多生产者、多消费者
 */
public class ProducerConsumer {
    //定义一个队列缓冲区,数据为Integer
    private final Queue<Integer> queue = new LinkedList<>();

    //设置缓冲区最大容量
    private static final int MAX_SIZE = 100;

    /**
     * 生产者。
     *
     * <p>生产者进行V原语操作</p>
     * <ul>
     * <li>如果缓冲区没有达到MAX_SIZE,则生产一个产品(n个也行)放入缓冲区,并唤醒所有线程</li>
     * <li>否则使自己进入缓冲区的等待池</li>
     * </ul>
     */
    class Producer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (queue) {
                    if (queue.size() < MAX_SIZE) {
                        int num = new Random().nextInt(100);
                        queue.offer(num);
                        queue.notifyAll();
                        System.out.println("生产者" + Thread.currentThread().getName() + "生产了产品:" + num + ",此时缓冲区数据量为:" + queue.size());
                    } else {
                        try {
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    /**
     * 消费者。
     * <p>消费者进行P原语操作</p>
     * <ul>
     * <li>如果缓冲区有数据,则从缓冲区取出一个产品(n个也行),并唤醒所有线程</li>
     * <li>否则使自己进入缓冲区的等待池</li>
     * </ul>
     */
    class Consumer implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                synchronized (queue) {
                    if (queue.size() > 0) {
                        int num = queue.poll();
                        System.out.println("消费者" + Thread.currentThread().getName() + "消费了产品:" + num + ",此时缓冲区数据量为:" + queue.size());
                        queue.notifyAll();
                    } else {
                        try {
                            queue.wait();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }
    }


    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();
        //Thread构造函数需要一个Runnable对象即可构造一个新的线程,Runnable对象可以重复利用,不必new多个
        //一个消费者,一个生产者
        Consumer c = pc.new Consumer();
        Producer p = pc.new Producer();
        //生产者和消费者谁先start都一样
        new Thread(c).start();
        new Thread(p).start();
    }

}

3.8 详解 AQS

(推荐阅读——ReentrantLock 原理

3.8.1、两种构造器
public ReentrantLock() {
    sync = new NonfairSync();
}
 
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

默认构造器初始化为 NonfairSync 对象,即非公平锁,而带参数的构造器可以指定使用公平锁和非公平锁。由 lock()和 unlock 的源码可以看到,它们只是分别调用了 sync 对象的 lock()和 release(1)方法。

**3.8.2、**NonfairSync
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

首先用一个 CAS 操作,判断 state 是否是 0(表示当前锁未被占用),如果是 0 则把它置为 1,并且设置当前线程为该锁的独占线程,表示获取锁成功。当多个线程同时尝试占用同一个锁时,CAS 操作只能保证一个线程操作成功,剩下的只能乖乖的去排队啦。

** “非公平”即体现在这里,如果占用锁的线程刚释放锁,state 置为 0,而排队等待锁的线程还未唤醒时,新来的线程就直接抢占了该锁,那么就“插队”了。**

** 若当前有三个线程去竞争锁,假设线程 A 的 CAS 操作成功了,拿到了锁开开心心的返回了,那么线程 B 和 C 则设置 state 失败,走到了 else 里面。我们往下看 acquire。**

3.8.3AQS 队列几个重要的属性
private transient volatile Node head;//队列首
private transient volatile Node tail;//队列尾
private volatile int state;//锁状态,加索成功1,解锁成功0,重入+1
private transient Thread exclusiveOwnerThread;//持有锁的那个线程
3.8.4、Node 对象属性和信息
static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

AQS 队列种队列头种 Thread 为空

3.8.5、ReentrantLock 的方法
private static final long serialVersionUID = 7373984872572414699L;
//属性变量
private final Sync sync;
// 静态内部类
abstract static class Sync extends AbstractQueuedSynchronizer
//非公平锁-继承上面的Sync
static final class NonfairSync extends Sync
//公平锁-继承上面的Sync    
static final class FairSync extends Sync
// 构造方法    
public ReentrantLock()    
public ReentrantLock(boolean fair)
// 方法    
public void lock()
public void unlock()
public boolean isLocked()    
//获取不到锁就直接返回    
public boolean tryLock()
public boolean tryLock(long timeout, TimeUnit unit)
//可以进行中断响应的lock方法    
public void lockInterruptibly()
public Condition newCondition()
public int getHoldCount()
public boolean isHeldByCurrentThread()
public final boolean isFair()
protected Thread getOwner()
public final boolean hasQueuedThreads()  
public final boolean hasQueuedThread(Thread thread)
public final int getQueueLength()
protected Collection<Thread> getQueuedThreads()
public boolean hasWaiters(Condition condition)
public int getWaitQueueLength(Condition condition)
protected Collection<Thread> getWaitingThreads(Condition condition)
public String toString()    

3.8.6 ReentrantLock 加锁的具体过程

3.8.6.1 首先讲一下类的结构

lock 对象有一个 sync 属性 ,我们创建对象的方式有两种 NonfairSync 、FairSync 非公平锁和公平锁

他们两个又都继承 Sync 而 sync 又继承 AbstractQueuedSynchronizer,所以说讲 ReentrantLock 其实就是讲 AQS

也就是,lock 里面有一个抽象队列同步器、队列里面几个比较重要的属性 head、tail、state、exclusiveOwnerThread 且里面有个静态内部类 Node 也就是线程节点,里面有 waitStatus、prev、next、thread

3.8.6.2 然后我们来讲一下加锁的过程

**当我们调用 lock 方法的时候,其实执行的是 **acquire(1);

public final void acquire(int arg) {
    
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

1、tryAcquire(arg) 有两种执行情况

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果当前没有线程持有锁
    if (c == 0) {
        //是否需要排队
        if (!hasQueuedPredecessors() &&
            //cas尝试获取锁
            compareAndSetState(0, acquires)) {
            //设置持有锁的为当前线程
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //判断持有锁的线程是否为当前线程,如果是state ++(可重入锁)
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

** **1.1、会先去判断当前锁是否被占用如果没有被占用 也就是 state==0


** **1.2、判断当前线程和持有锁的线程是否一致

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这时候又来了一个线程,或者第一个线程 tryAcquire 失败,那么将执行 addWaiter(Node.EXCLUSIVE) 节点关系

** 然后执行 acquireQueued 、尝试添加到队列**

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //上一个节点 p
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

如果当前线程的 pre 节点为首节点的话,会进行一次自旋(再次调用 tryAcquire())如果获取 锁成功,设置当前 节点为头节点,并且将原来头节点的 next 置空,帮助 GC,置空当前节点的返回 false,方法结束。

如果当前线程的 pre 节点不是头节点,或者是头节点,但是竞争锁失败,那么会执行 shouldParkAfterFailedAcquire(p, node) (竞争锁失败后是否需要 Park)如果是,那么就进行 park 如果不需要,继续循环。

3.8.6.2 再讲一下 3 种加锁方法

Synchronized 和 ReentrantLock 的区别

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    2936 引用 • 8125 回帖 • 659 关注
  • 10 引用 • 8 回帖

广告 我要投放

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...