关于 spring-integration-redis 是个不完善的 redis 分布式锁这件事

本贴最后更新于 1512 天前,其中的信息可能已经时移俗易

hello,大家好,欢迎来到银之庭。我是 Z,一个普通的程序员。最近在工作中遇到了需要用分布式锁的场景,我就直接拷贝了其他项目里前人写的一个 redis 分布式锁拿来用了,大体看了一眼也没啥毛病。不过工作之余,我还是搜索了一下有没有开源的 redis 分布式锁实现,如果有的话就研究一下,看能不能引入到工作项目中来。在搜到 spring-integration-redis 项目并仔细研究了它的源码后,我惊讶地发现这个 spring 下属的开源项目,虽然代码质量挺高,但 redis 分布式锁的实现逻辑居然是有缺陷的,具体地说就是 spring-integration-redis 在极端场景下,会直接删除其他线程占用的 redis 锁。下面我们就来仔细研究下 spring-integration-redis 的代码,看看这个问题是如何产生的。

PS:以下实验用到了两个项目,可以从我的 github 上下载到,分别是 demo 项目demo2 项目

1. 基本使用

我们先来试用一下 spring-integration-redis 这个组件。打开上面提到的两个项目,先运行起 demo 项目,并查看 demo 项目的 com.example.demo.controller.TestNoLock 文件,代码如下:

package com.example.demo.controller;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.CountDownLatch;

@RestController
public class TestNoLock {
    private int count;

    @RequestMapping("/testNoLock/")
    public String test() throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(1000);
        count = 0;
        for (int i = 0; i < 1000; i++) {
            new Thread(() -> {
                try {
                    int tempCount = count;
                    tempCount += 1;
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                    }
                    count = tempCount;
                }finally {
                    countDownLatch.countDown();
                }
            }).start();
        }

        countDownLatch.await();
        System.out.println("final count is: " + count);

        return "ok";
    }
}

这部分代码模拟的是在不加锁的情况下 1000 个线程并发修改一个共享变量,为了让效果更明显,并发修改的代码拆分了 count++ 的内部逻辑,并休眠了 1ms(不知道是不是因为我电脑性能太高,如果不休眠的话,这段没加锁的代码运行结果大概率也是 count=1000,没有产生并发修改的问题)。效果也很明显,多次访问 http://127.0.0.1:9982/testNoLock/ URL,可以在终端看到最终的 count 的值,我的运行结果如下图:image.png

可以看到产生了明显的并发修改问题。下面我们来使用 spring-integration-redis 提供的分布式锁避免并发修改问题。

使用 spring-integration-redis 比较简单,先引入依赖包:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-data-redis</artifactId>
	<version>2.3.4.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-integration</artifactId>
	<version>2.3.4.RELEASE</version>
</dependency>
<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-redis</artifactId>
	<version>5.3.2.RELEASE</version>
</dependency>

然后新建一个配置类往 spring 容器里添加一个 org.springframework.integration.redis.util.RedisLockRegistry 的对象,代码如下:

package com.example.demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;

@Configuration
public class RedisLock {

    @Bean
    public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
        return new RedisLockRegistry(redisConnectionFactory, "registry_key");
    }
}

最后在需要的地方使用 RedisLockRegistry 新建锁对象即可,示例在 demo 项目的 com.example.demo.controller.TestLock 文件中,代码如下:

package com.example.demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;

@RestController
public class TestLock {

    private int count;

    @Autowired
    private RedisLockRegistry redisLockRegistry; // 自动注入我们创建的redisLockRegistry对象

    @GetMapping("/testLock/")
    public String testLock() throws InterruptedException {

        // 创建锁对象
        Lock lock = redisLockRegistry.obtain("lock_key");

        CountDownLatch countDownLatch = new CountDownLatch(1000);
        count = 0;
        for (int i = 0; i < 1000; i++) {
            new Thread(() -> {
                // 加锁
                lock.lock();
                try {
                    int tempCount = count;
                    tempCount += 1;
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                    }
                    count = tempCount;
                } finally {
                    countDownLatch.countDown();
                    // finally中确保释放锁
                    lock.unlock();
                }
            }).start();
        }

        countDownLatch.await();
        System.out.println("final count is: " + count);

        return "ok";
    }

}

这个接口是起 1000 个线程并发尝试去获取锁,然后修改共享变量 count。这个接口依赖在本地的 6379 端口启动的 redis 服务,启动 redis 比较简单,这里不再赘述。

访问 http://127.0.0.1:9982/testLock/,可以在终端看到 count 的值都是 1000,确实防止了并发修改的问题,如下图:

image.png

到目前为止这个组件的功能看起来好像是没问题的,但实际上,我们现在只测试了一台实例(一个进程)的情况,这种情况实际上使用的是我称之为“进程锁”的特性,即保证一个进程内不会有多个线程同时进入某个临界区,而没有使用到“分布式锁”的特性。网上很多介绍 spring-integration-redis 的文章中的例子都只到了这一步,而没有验证到“分布式锁”的特性。可能是混淆了“进程锁“和“分布式锁“的概念,也可能是因为模拟多实例比较麻烦,所以用验证“进程锁”的特性糊弄过去了。而 spring-integration-redis 实现的“分布式锁”特性是有缺陷的。我们先来研究下它的代码,再来看看缺陷在哪里。

2. 源码分析

点击 demo 项目代码中的 redisLockRegistry.obtainlock.locklock.unlock 都可以进入 spring-integration-redis 的主文件 org.springframework.integration.redis.util.RedisLockRegistry。我们主要分析 lock.lock(), lock.tryLock(), lock.tryLock(long time, TimeUnit unit), lock.unlock() 方法。这几个方法也是实现分布式锁语义的关键方法。

2.1 lock()

首先看 lock.lock()。代码截取如下:

@Override
public void lock() {
	this.localLock.lock();
	while (true) {
		try {
			while (!obtainLock()) {
				Thread.sleep(100); //NOSONAR
			}
			break;
		}
		catch (InterruptedException e) {
			/***/
		}
		catch (Exception e) {
			this.localLock.unlock();
			rethrowAsLockException(e);
		}
	}
}

private boolean obtainLock() {
	Boolean success = RedisLockRegistry.this.redisTemplate.execute(
                        RedisLockRegistry.this.obtainLockScript,
			Collections.singletonList(this.lockKey),
			RedisLockRegistry.this.clientId,
			String.valueOf(RedisLockRegistry.this.expireAfter));

	boolean result = Boolean.TRUE.equals(success);

	if (result) {
		this.lockedAt = System.currentTimeMillis();
	}
	return result;
}

方法中会先尝试调用 localLock.lock() 方法,localLock 是个本地的重入锁 ReentrantLock,这个本地锁有两个作用,一是直接在本地拦截其他线程的请求,避免每次都请求 redis,提高锁性能,二是借助重入锁实现了分布式锁的可重入特性的一部分(另一部分是在发给 redis 执行的脚本中实现的,如果是重入获取锁,则刷新过期时间),这个 localLock 的使用还是比较巧妙的,值得学习。接下来是个死循环,不断调用 obtainLock() 方法尝试获取锁,如果失败就休眠 100ms 重试,并忽略了中断异常,遇到其他异常时会释放本地锁,并重新抛出异常。下面主要看下 obtainLock() 方法,这个方法就是直接向 redis 发请求,执行一个 lua 脚本,防止执行多条 redis 命令中间被其他实例的线程打断,导致锁处于一个不一致的状态。脚本的意思比较简单,先查一下锁的 key,如果存在且 value 是本线程的,就刷新过期时间,证明本次是个重入获取锁的过程,如果锁的 key 不存在,则新建锁的 key,写入本线程的 value,其他情况直接返回失败。

2.2 tryLock()

下面看一下 tryLock(),它直接调用了 tryLock(long time, TimeUnit unit) 这个重载方法。我们直接看后面的重载方法。tryLock 实现的是可设置超时时间的尝试获取锁的语义。代码如下:

@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
	long now = System.currentTimeMillis();
	if (!this.localLock.tryLock(time, unit)) {
		return false;
	}
	try {
		long expire = now + TimeUnit.MILLISECONDS.convert(time, unit);
		boolean acquired;
		while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { //NOSONAR
			Thread.sleep(100); //NOSONAR
		}
		if (!acquired) {
			this.localLock.unlock();
		}
		return acquired;
	}
	catch (Exception e) {
		this.localLock.unlock();
		rethrowAsLockException(e);
	}
	return false;
}

也是先尝试获取本地的重入锁,如果本地锁获取失败直接返回失败。获取成功后再调用 obtainLock() 方法获取 redis 锁,并循环检查是否到超时时间,如果最终没有获取到 redis 锁,需要释放本地锁,如果获取到了 redis 锁,则一直持有本地锁,直到 unlock() 方法中释放本地和 redis 的两个锁。这里有个细节是,传入的时间其实用在两个地方,一是重试获取本地锁的超时时间,二是尝试获取 redis 锁的超时时间,所以调用 tryLock(time, unit) 方法,实际可能等待 2*time 的时间。一个参数代表两个含义,我也不清楚这个设计是好是坏。

到目前为止都还挺正常的,本地锁的使用也值得我们学习,但后面 unlock() 的实现就有些糙了,我们一起来看一下。

2.3 unlock()

unlock() 的代码如下:

@Override
public void unlock() {
	if (!this.localLock.isHeldByCurrentThread()) {
		throw new IllegalStateException("You do not own lock at " + this.lockKey);
	}
	if (this.localLock.getHoldCount() > 1) {
		this.localLock.unlock();
		return;
	}
	try {
		if (!isAcquiredInThisProcess()) {
			throw new IllegalStateException("Lock was released in the store due to expiration. " +
					"The integrity of data protected by this lock may have been compromised.");
		}

		if (Thread.currentThread().isInterrupted()) {
					RedisLockRegistry.this.executor.execute(this::removeLockKey);
		}
		else {
			removeLockKey();
		}

		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug("Released lock; " + this);
		}
	}
	catch (Exception e) {
		ReflectionUtils.rethrowRuntimeException(e);
	}
	finally {
		this.localLock.unlock();
	}
}

unlock() 方法全程实现的语义类似:如果本地锁不是本线程持有的,直接报错,后续就以本地锁是本线程持有为前提。如果本地锁重入次数大于 1,则本次释放只需要把重入次数减一即可,然后直接返回。如果重入次数已经等于 1 了,就得同时释放 redis 锁和本地锁了。释放 redis 锁时先检查 redis 里锁的 value 是不是本线程的,如果不是就报错,这时候可能是因为本线程写入 redis 的锁过期释放了,然后被其他实例的线程重新写入锁了。如果是,再发一条命令删除锁的 key,命令可以是 unlinkdel,这不重要。到这可能有同学已经发现问题了,我们准备 redis 分布式锁的面试题的时候,都会注意说“释放锁要用 lua 脚本,把检查锁是不是本线程持有和删除锁放到一个 lua 脚本中,防止高并发时误删其他线程写入的锁”。但这里判断锁是不是本线程持有和删除 key 却分成了两条 redis 命令,那么在极端情况下,本线程判断锁是本线程持有的,但在删除命令执行前,key 过期了,有其他实例的线程写入了锁,然后本线程却直接删除了锁的 key,就会误删其他实例线程写入的锁了(其他实例的线程真是躺着中枪了)。这种情况确实比较极端,不容易复现,下面我们借助 debug 来拉长 unlock() 方法每一步的执行时间,并通过起两个服务来模拟多个实例的情况(实际上是多个进程,进程在同一实例还是不同实例其实没区别)。

3. unlock 方法缺陷验证

这时候需要用 debug 模式启动 demo 和 demo2 两个项目,demo2 项目的代码和 demo 几乎一样,只是配置不同。我们用两个项目的 controller.TestUnlock 中提供的接口来验证 unlock() 方法在极端情况下的缺陷。代码如下:

package com.example.demo.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

@RestController
public class TestUnlock {

    @Autowired
    private RedisLockRegistry redisLockRegistry; // 自动注入我们创建的redisLockRegistry对象

    @GetMapping("/testUnlock/")
    public String testUnlock() throws InterruptedException {
        // 创建锁对象
        Lock lock = redisLockRegistry.obtain("lock_key");

        lock.tryLock(60, TimeUnit.SECONDS);
        try {
            Thread.sleep(10);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

        return "ok";
    }
}

我们把断点打到 demo2 的 lock.unlock() 方法内的 removeLockKey() 方法那一行,然后调用 demo2 的接口 http://127.0.0.1:9983/testUnlock/,如果执行到断点的地方,证明 redis 锁已经写入成功了,并且已经在尝试释放锁了,我们可以去 redis 里验证一下,效果如下图:

image.png

现在我们什么都不做,等待 redis 里这个 key 过期,然后把断点打到 demo 项目的 lock.unlock() 方法那一行,调用 demo 的接口 http://127.0.0.1:9982/testUnlock/,尝试获取锁,这时候会获取成功。redis 里的锁的 value 已经变成 demo 线程的 clientId 了,如下图:

image.png

先不要往下执行 demo 项目,回到 demo2,点击继续,会发现 demo2 直接把 demo 写入的锁删除掉了,而 demo 继续执行下去,在解锁时会报 Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised. 的错,但实际上,它写入的锁并没有超时,而是被 demo2 给删除了。

以上,我们试用了 spring-integration-redis 提供的分布式锁功能,并阅读了它的代码,发现了它在实现分布式锁时有“可能删除其他进程写入的锁”的问题。所以,redis 分布式锁的实现我并不建议大家用这个组件,我更推荐使用 redission 这个组件。这个组件也是 redis 官方推荐的实现。后续有机会我再给大家介绍一下 redission 的使用。就这样 ~

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3187 引用 • 8213 回帖
  • Redis

    Redis 是一个开源的使用 ANSI C 语言编写、支持网络、可基于内存亦可持久化的日志型、Key-Value 数据库,并提供多种语言的 API。从 2010 年 3 月 15 日起,Redis 的开发工作由 VMware 主持。从 2013 年 5 月开始,Redis 的开发由 Pivotal 赞助。

    286 引用 • 248 回帖 • 62 关注
  • 分布式
    80 引用 • 149 回帖 • 4 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • darksword

    是你的代码写的不完善,你都没有判断有本线程没有拿到锁,就强制去 unlock 当然会出这个问题了