Redisson分布式锁剖析
Redisson是具备多种内存数据网格特性的基于Java编写的Redis客户端框架(Redis Java Client with features of In-Memory Data Grid),基于Redis的基本数据类型扩展出很多种实现的高级数据结构,具体见其官方的简介图。本文要分析的R(ed)Lock实现,只是其中一个很小的模块,其他高级特性可以按需选用。下面会从基本原理、源码分析等内容进行展开。
基本原理
从Redis官网文档中,摘录部分:在许多环境中不同进程必须以互斥方式使用共享资源进行操作时,分布式锁是一个非常有用的原语。此试图提供一种更规范的算法来实现Redis的分布式锁。我们提出了一种称为Redlock的算法,它实现了DLM(Distributed Lock Manager),我们认为它比普通的单实例方法更安全。算法的三个核心特征(三大最低保证):
使用Redisson中的RLock
使用RLock要先实例化Redisson,Redisson已经适配了Redis的哨兵、集群、普通主从和单机模式。这里使用单机模式配置进行演示。实例化RedissonClient:
Redisson中RLock的实现原理
Redisson中RLock的实现是基本参照了Redis的red lock算法进行实现,不过在原始的red lock算法下进行了改良,主要包括下面的特性:
小结
Redisson中的red lock实现,应用到下面的核心技术:
基本原理
从Redis官网文档中,摘录部分:在许多环境中不同进程必须以互斥方式使用共享资源进行操作时,分布式锁是一个非常有用的原语。此试图提供一种更规范的算法来实现Redis的分布式锁。我们提出了一种称为Redlock的算法,它实现了DLM(Distributed Lock Manager),我们认为它比普通的单实例方法更安全。算法的三个核心特征(三大最低保证):
- Safety property(安全性):互斥。确保在任何给定时刻下,只有一个客户端可以持有锁
- Liveness property A(活性A):无死锁。即使存在曾经锁定资源的客户端崩溃或者出现网络分区异常,确保锁总是能够成功获取
- Liveness property B(活性B):容错性。只要大多数Redis节点处于正常运行状态,客户端就可以获取和释放锁
- 客户端A获取Redis主节点中的锁(假设锁定的资源为X)
- 在Redis主节点把KEY同步到Redis从节点之前,Redis主节点崩溃
- Redis从节点因为故障晋升为主节点
- 此时,客户端B获取资源X的锁成功,问题是资源X的锁在前面已经被客户端A获取过,这样就出现了并发问题
SET $resource_name $random_value NX PX $ttl这里的Nx和PX是SET命令的增强参数,自从Redis的2.6.12版本起,SET命令已经提供了可选的复合操作符:
- EX:设置超时时间,单位是秒
- PX:设置超时时间,单位是毫秒
- NX:IF NOT EXIST的缩写,只有KEY不存在的前提下才会设置K-V,设置成功返回1,否则返回0
- XX:IF EXIST的缩写,只有在KEY存在的前提下才会设置K-V,设置成功返回1,否则返回0
# KEYS[1] = $resource_name # ARGV[1] = $random_value if redis.call("get",KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1]) else return 0 end
使用Redisson中的RLock
使用RLock要先实例化Redisson,Redisson已经适配了Redis的哨兵、集群、普通主从和单机模式。这里使用单机模式配置进行演示。实例化RedissonClient:
static RedissonClient REDISSON; @BeforeClass public static void beforeClass() throws Exception { Config config = new Config(); // 单机 config.useSingleServer() .setTimeout(10000) .setAddress("redis://127.0.0.1:6379"); REDISSON = Redisson.create(config); // // 主从 // config.useMasterSlaveServers() // .setMasterAddress("主节点连接地址") // .setSlaveAddresses(Sets.newHashSet("从节点连接地址")); // REDISSON = Redisson.create(config); // // 哨兵 // config.useSentinelServers() // .setMasterName("Master名称") // .addSentinelAddress(new String[]{"哨兵连接地址"}); // REDISSON = Redisson.create(config); // // 集群 // config.useClusterServers() // .addNodeAddress(new String[]{"集群节点连接地址"}); // REDISSON = Redisson.create(config); }加锁和解锁:
@Test public void testLockAndUnLock() throws Exception { String resourceName = "resource:x"; RLock lock = REDISSON.getLock(resourceName); Thread threadA = new Thread(() -> { try { lock.lock(); process(resourceName); } finally { lock.unlock(); System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName)); } }, "threadA"); Thread threadB = new Thread(() -> { try { lock.lock(); process(resourceName); } finally { lock.unlock(); System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName)); } }, "threadB"); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } private void process(String resourceName) { String threadName = Thread.currentThread().getName(); System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName)); try { Thread.sleep(1000); } catch (InterruptedException ignore) { } } // 某次执行的输出结果 线程threadB获取到资源resource:x的锁 线程threadB释放资源resource:x的锁 线程threadA获取到资源resource:x的锁 线程threadA释放资源resource:x的锁更多的时候,会选用带等待时间周期和锁最大持有时间的API:
@Test public void testTryLockAndUnLock() throws Exception { String resourceName = "resource:x"; int waitTime = 500; int leaseTime = 1000; Thread threadA = new Thread(() -> { process(resourceName, waitTime, leaseTime); }, "threadA"); Thread threadB = new Thread(() -> { process(resourceName, waitTime, leaseTime); }, "threadB"); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } private void process(String resourceName, int waitTime, int leaseTime) { RLock lock = REDISSON.getLock(resourceName); try { String threadName = Thread.currentThread().getName(); boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (tryLock) { try { System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName)); Thread.sleep(800); } finally { lock.unlock(); System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName)); } } else { System.out.println(String.format("线程%s获取资源%s的锁失败,等待时间:%d ms", threadName, resourceName, waitTime)); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } // 某次执行的输出结果 线程threadA获取到资源resource:x的锁 线程threadB获取资源resource:x的锁失败,等待时间:500 ms 线程threadA释放资源resource:x的锁为了使用的时候更加简单,可以参考spring-tx中的编程式事务那样进行轻度封装:
@RequiredArgsConstructor private static class RedissonLockProvider { private final RedissonClient redissonClient; public使用RedissonLockProvider(仅供参考):T executeInLock(String resourceName, LockAction lockAction) { RLock lock = redissonClient.getLock(resourceName); try { lock.lock(); lockAction.onAcquire(resourceName); return lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } public T executeInLock(String resourceName, int waitTime, int leaseTime, LockAction lockAction) throws InterruptedException { RLock lock = redissonClient.getLock(resourceName); boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (tryLock) { try { lockAction.onAcquire(resourceName); return lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } return null; } public void executeInLockWithoutResult(String resourceName, int waitTime, int leaseTime, LockActionWithoutResult lockAction) throws InterruptedException { RLock lock = redissonClient.getLock(resourceName); boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS); if (tryLock) { try { lockAction.onAcquire(resourceName); lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } } public void executeInLockWithoutResult(String resourceName, LockActionWithoutResult lockAction) { RLock lock = redissonClient.getLock(resourceName); try { lock.lock(); lockAction.onAcquire(resourceName); lockAction.doInLock(resourceName); } finally { lock.unlock(); lockAction.onExit(resourceName); } } } @FunctionalInterface interface LockAction { default void onAcquire(String resourceName) { } T doInLock(String resourceName); default void onExit(String resourceName) { } } @FunctionalInterface interface LockActionWithoutResult { default void onAcquire(String resourceName) { } void doInLock(String resourceName); default void onExit(String resourceName) { } }
@Test public void testRedissonLockProvider() throws Exception { RedissonLockProvider provider = new RedissonLockProvider(REDISSON); String resourceName = "resource:x"; Thread threadA = new Thread(() -> { provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() { @Override public void onAcquire(String resourceName) { System.out.println(String.format("线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName)); } @Override public void doInLock(String resourceName) { try { Thread.sleep(800); } catch (InterruptedException ignore) { } } @Override public void onExit(String resourceName) { System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName)); } }); }, "threadA"); Thread threadB = new Thread(() -> { provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() { @Override public void onAcquire(String resourceName) { System.out.println(String.format("线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName)); } @Override public void doInLock(String resourceName) { try { Thread.sleep(800); } catch (InterruptedException ignore) { } } @Override public void onExit(String resourceName) { System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName)); } }); }, "threadB"); threadA.start(); threadB.start(); Thread.sleep(Long.MAX_VALUE); } // 某次执行结果 线程threadA获取到资源resource:x的锁 线程threadA释放资源resource:x的锁 线程threadB获取到资源resource:x的锁 线程threadB释放资源resource:x的锁
Redisson中RLock的实现原理
Redisson中RLock的实现是基本参照了Redis的red lock算法进行实现,不过在原始的red lock算法下进行了改良,主要包括下面的特性:
- 互斥
- 无死锁
- 可重入,类似于ReentrantLock,同一个线程可以重复获取同一个资源的锁(一般使用计数器实现),锁的重入特性一般情况下有利于提高资源的利用率
- 续期,这个是一个比较前卫解决思路,也就是如果一个客户端对资源X永久锁定,那么并不是直接对KEY生存周期设置为-1,而是通过一个守护线程每隔固定周期延长KEY的过期时间,这样就能实现在守护线程不被杀掉的前提下,避免客户端崩溃导致锁无法释放长期占用资源的问题
- 锁状态变更订阅,依赖于org.redisson.pubsub.LockPubSub,用于订阅和通知锁释放事件
- 不是完全参考red lock算法的实现,数据类型选用了HASH,配合Lua脚本完成多个命令的原子性
public class Redisson implements RedissonClient { // ...... 省略其他代码 @Override public RLock getLock(String name) { return new RedissonLock(connectionManager.getCommandExecutor(), name); } // ...... 省略其他代码 }因此只需要围绕RedissonLock的源码进行分析即可。RedissonLock的类继承图如下: 这里需要有几点认知:
- RedissonLock实现了java.util.concurrent.locks.Lock接口中除了newCondition()方法外的所有方法,也就是可以基本无缝适配Lock接口,对于习惯Lock接口的API的使用者来说是一个福音。
- RedissonLock基本所有同步API都依赖于异步API的实现,也就是RLock的实现依赖于RLockAsync的实现,底层依赖的是Netty的io.netty.util.concurrent.Promise,具体见RedissonPromise,如果用过JUC中的Future的开发者应该比较熟悉Future#get(),这里的做法类似。
右边的几个父类的简单功能描述如下:
- RObjectAsync:所有Redisson对象的基础接口,提供一些内存测量、对象拷贝、移动等的异步方法
- RObject:RObjectAsync的同步版本
- RExpirableAsync:提供对象TTL相关的异步方法
- RExpirable:RExpirableAsync的同步版本
- RedissonObject:直接实现类RObject接口中的方法
- RedissonExpirable:主要是实现了RExpirable接口中的方法
// 存放entryName -> ExpirationEntry,用于获取当前entryName的线程重入计数器和续期任务 private static final ConcurrentMap这里需要关注一下Config中的lockWatchdogTimeout参数: 翻译一下大意:lockWatchdogTimeout参数只有在没有使用leaseTimeout参数定义的成功获取到锁的场景(简单来说就是不设置时限的加锁)下生效,如果看门狗在下一个lockWatchdogTimeout周期内不进行续期,那么锁就会过期释放(从源码上看,每三分之一lockWatchdogTimeout就会执行一次续期任务,每次通过pexpire把KEY的存活周期延长lockWatchdogTimeout),lockWatchdogTimeout的默认值为30000,也就是30秒。 这里先列举一下RedissonLock中获取名称的方法,以便后面分析这些名称作为K-V结构的KEY时候使用:EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>(); // 内部的锁持有的最大时间,来源于参数Config#lockWatchdogTimeout,用于控制续期的周期 protected long internalLockLeaseTime; // ID,唯一标识,是一个UUID final String id; // final String entryName; // 锁释放事件订阅发布相关 protected final LockPubSub pubSub; // 命令异步执行器实例 final CommandAsyncExecutor commandExecutor; /** * CommandAsyncExecutor是命令的异步执行器,里面的方法是相对底层的面向通讯框架的方法,包括异步写、异步读和同步结果获取等 * name参数就是getLock()时候传入的参数,其实就是最终同步到Redis中的KEY */ public RedissonLock(CommandAsyncExecutor commandExecutor, String name) { super(commandExecutor, name); this.commandExecutor = commandExecutor; // 这里的ID为外部初始化的UUID实例,调用toString() this.id = commandExecutor.getConnectionManager().getId(); this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(); // 这里的entryName = uuid值 + : + 外部传进来的name(KEY),如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x this.entryName = id + ":" + name; // 初始化LockPubSub实例,用于订阅和发布锁释放的事件 this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub(); } // RedissonLock内部类ExpirationEntry,存放着线程重入的计数器和续期的Timeout任务 public static class ExpirationEntry { // 线程ID -> 线程重入的次数 private final Map threadIds = new LinkedHashMap<>(); private volatile Timeout timeout; public ExpirationEntry() { super(); } // 这个方法主要记录线程重入的计数 public void addThreadId(long threadId) { Integer counter = threadIds.get(threadId); if (counter == null) { counter = 1; } else { counter++; } threadIds.put(threadId, counter); } public boolean hasNoThreads() { return threadIds.isEmpty(); } public Long getFirstThreadId() { if (threadIds.isEmpty()) { return null; } return threadIds.keySet().iterator().next(); } public void removeThreadId(long threadId) { Integer counter = threadIds.get(threadId); if (counter == null) { return; } counter--; if (counter == 0) { threadIds.remove(threadId); } else { threadIds.put(threadId, counter); } } public void setTimeout(Timeout timeout) { this.timeout = timeout; } public Timeout getTimeout() { return timeout; } }
- id:由配置实例化时候实例化的UUID实例生成,从源码上分析每个连接方式的Redisson实例有唯一的UUID,ConnectionManager初始化的时候会调用UUID id = UUID.randomUUID(),可以理解为Redisson实例在某个应用程序进程中的唯一标识,毕竟一般情况下,一个应用程序应该只会应用一种Redisson的连接方式
- getEntryName():返回的是UUID + : + $KEY,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
- getName():返回的是$KEY,例如resource:x
- getChannelName():返回的是redisson_lock__channel:{$KEY},例如redisson_lock__channel:{resource:x}
- getLockName(long threadId):返回的是UUID + : + $threadId,例如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
- private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException:lock方法体系
- public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException:tryLock方法体系
/** * 获取锁,不指定等待时间,只指定锁的最大持有时间 * 通过interruptibly参数配置支持中断 */ private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { long threadId = Thread.currentThread().getId(); // 尝试获取锁,返回的ttl为空代表获取锁成功,返回的ttl代表已经存在的KEY的剩余存活时间 Long ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { return; } // 订阅redisson_lock__channel:{$KEY},其实本质的目的是为了客户端通过Redis的订阅发布,感知到解锁的事件 // 这个方法会在LockPubSub中注册一个entryName -> RedissonLockEntry的哈希映射,先留意一下属性internalLockLeaseTime,它在tryLockInnerAsync()方法内被重新赋值,在leaseTime == -1L的前提下,它被赋值为lockWatchdogTimeout,这个细节很重要,决定了后面续期方法(看门狗)的调度频率。另外,leaseTime != -1L不会进行续期,也就是不会启动看门狗机制。接着需要仔细分析一下tryLockInnerAsync()中执行的LUA脚本,把它提取出来通过注释进行描述:
// RedissonLockEntry实例中存放着RPromise结果,一个信号量形式的锁和订阅方法重入计数器 // 下面的死循环中的getEntry()或者RPromise#getNow()就是从这个映射中获取的 RFuturefuture = subscribe(threadId); // 同步订阅执行,获取注册订阅Channel的响应,区分是否支持中断 if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } // 走到下面的for循环说明返回的ttl不为空,也就是Redis已经存在对应的KEY,
// 有其他客户端已经获取到锁,此客户端线程的调用需要阻塞等待获取锁 try { while (true) { // 死循环中尝试获取锁,这个是后面会分析的方法 ttl = tryAcquire(leaseTime, unit, threadId); // 返回的ttl为空,说明获取到锁,跳出死循环,这个死循环或者抛出中断异常,或者获取到锁成功break跳出,没有其他方式 if (ttl == null) { break; } // 这个ttl来源于等待存在的锁的KEY的存活时间,直接使用许可为0的信号量进行阻塞等待,下面的几个分支判断都是大同小异,只是有的支持超时时间,有的支持中断 // 有的是永久阻塞直到锁释放事件订阅LockPubSub的onMessage()方法回调激活getLatch().release()进行解锁才会往下走 // 这里可以学到一个特殊的技巧,Semaphore(0),信号量的许可设置为0,首个调用acquire()的线程会被阻塞,
// 直到其他线程调用此信号量的release()方法才会解除阻塞,类似于一个CountDownLatch(1)的效果 if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { // 获取到锁或者抛出中断异常,退订redisson_lock__channel:{$KEY},不再关注解锁事件 unsubscribe(future, threadId); } } // 这是一个异步转同步的方法,类似于FutureTask#get(),关键看调用的tryAcquireAsync()方法 private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } /** * 通过传入锁持有的最大时间和线程ID异步获取锁 */ privateRFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { // 锁持有最大时间不为-1,也就是明确锁的持有时间,不是永久持有的场景 if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } // 走到这里说明是leaseTime == -1,KEY不设置过期时间的分支,需要启动看门狗机制。尝试内部异步获取锁,
// 注意这里的lockWatchdogTimeout是从配置中获取传进去,不是内部的internalLockLeaseTime属性,这里的默认值还是30000毫秒 RFuturettlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { // 执行异常场景直接返回 if (e != null) { return; } // 成功获取到锁的场景,需要基于线程ID启用看门狗,通过时间轮指定定时任务进行续期 if (ttlRemaining == null) { // 定时调度进行续期操作 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } /** * 转换锁持有最大时间,通过参数进行加锁的LUA脚本调用 * getName()就是传入的KEY,如resource:x getLockName()就是锁的名称,形式是:UUID + : + threadId,如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 * internalLockLeaseTime在leaseTime != -1的前提下使用的是原值,在leaseTime == -1的前提下,使用的是lockWatchdogTimeout */RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) { // 时间转换为毫秒,注意一点这里的internalLockLeaseTime是类内的属性,被重新赋值了 internalLockLeaseTime = unit.toMillis(leaseTime); // 底层向Redis服务执行LUA脚本 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.
-- KEYS[1] == getName() --> $KEY --> resource:x -- ARGV[1] == internalLockLeaseTime --> 30000 -- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 -- 第一段代码是判断锁定的资源KEY不存在的时候进行相应值的设置,代表资源没有被锁定,首次获取锁成功 if (redis.call('exists', KEYS[1]) == 0) then -- 这里是设置调用次数,可以理解为延长KEY过期时间的调用次数 redis.call('hset', KEYS[1], ARGV[2], 1); -- 设置KEY的过期时间 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 第二段代码是判断HASH的field是否存在,如果存在说明是同一个线程重入的情况,这个时候需要延长KEY的TTL,并且HASH的field对应的value加1,记录延长ttl的次数 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then -- 这里是增加调用次数,可以理解为增加延长KEY过期时间的调用次数 redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 延长KEY的过期时间 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 第三段代码是兜底的,走到这里说明当前线程获取锁失败,锁已经被其他(进程中的)线程占有,返回当前KEY被占用资源的ttl,用来确定需要休眠的最大时间 return redis.call('pttl', KEYS[1]);这里画一个图演示一下这个Lua脚本中三段代码出现的逻辑: 剩下一个scheduleExpirationRenewal(threadId)方法还没有分析,里面的逻辑就是看门狗的定期续期逻辑:
// 基于线程ID定时调度和续期 private void scheduleExpirationRenewal(long threadId) { // 如果需要的话新建一个ExpirationEntry记录线程重入计数,同时把续期的任务Timeout对象保存在属性中 ExpirationEntry entry = new ExpirationEntry(); ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry); if (oldEntry != null) { // 当前进行的当前线程重入加锁 oldEntry.addThreadId(threadId); } else { // 当前进行的当前线程首次加锁 entry.addThreadId(threadId); // 首次新建ExpirationEntry需要触发续期方法,记录续期的任务句柄 renewExpiration(); } } // 处理续期 private void renewExpiration() { // 根据entryName获取ExpirationEntry实例,如果为空,说明在cancelExpirationRenewal()方法已经被移除,一般是解锁的时候触发 ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ee == null) { return; } // 新建一个定时任务,这个就是看门狗的实现,io.netty.util.Timeout是Netty结合时间轮使用的定时任务实例 Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { // 这里是重复外面的那个逻辑, ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName()); if (ent == null) { return; } // 获取ExpirationEntry中首个线程ID,如果为空说明调用过cancelExpirationRenewal()方法清空持有的线程重入计数,一般是锁已经释放的场景 Long threadId = ent.getFirstThreadId(); if (threadId == null) { return; } // 向Redis异步发送续期的命令 RFuture基于源码推断出续期的机制由入参leaseTime决定:future = renewExpirationAsync(threadId); future.onComplete((res, e) -> { // 抛出异常,续期失败,只打印日志和直接终止任务 if (e != null) { log.error("Can't update lock " + getName() + " expiration", e); return; } // 返回true证明续期成功,则递归调用续期方法(重新调度自己),续期失败说明对应的锁已经不存在,直接返回,不再递归 if (res) { // reschedule itself renewExpiration(); } }); } }, // 这里的执行频率为leaseTime转换为ms单位下的三分之一,由于leaseTime初始值为-1的情况下才会进入续期逻辑,那么这里的执行频率为lockWatchdogTimeout的三分之一 internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); // ExpirationEntry实例持有调度任务实例 ee.setTimeout(task); } // 调用Redis,执行Lua脚本,进行异步续期 protected RFuture renewExpirationAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;", Collections.
- 当leaseTime == -1的前提下(一般是lock()和lockInterruptibly()这类方法调用),续期任务的调度周期为lockWatchdogTimeout / 3,锁的最大持有时间(KEY的过期时间)被刷新为lockWatchdogTimeout
- 当leaseTime != -1的前提下(一般是lock(long leaseTime, TimeUnit unit)和lockInterruptibly(long leaseTime, TimeUnit unit)这类方法调用指定leaseTime不为-1),这种情况下会直接设置锁的过期时间为输入值转换为ms单位的时间量,不会启动续期机制
-- KEYS[1] == getName() --> $KEY --> resource:x -- ARGV[1] == internalLockLeaseTime --> 30000 -- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('pexpire', KEYS[1], ARGV[1]); return 1; end; return 0;到此为止,不带waitTime参数的加锁和续期逻辑基本分析完毕,而带waitTime参数的tryLock(long waitTime, long leaseTime, TimeUnit unit)实现其实和只存在leaseTime参数的lock(long leaseTime, TimeUnit unit, boolean interruptibly)实现底层调用的方法是一致的,最大的区别是会在尝试获取锁操作之后基于前后的System.currentTimeMillis()计算出时间差和waitTime做对比,决定需要阻塞等待还是直接超时获取锁失败返回,处理阻塞等待的逻辑是客户端本身的逻辑,这里就不做详细展开,因为源码实现也不是十分优雅(太多long currentTime = System.currentTimeMillis()的代码段了)。接着花点功夫分析一下解锁的实现,包括一般情况下的解锁unlock()和强制解锁forceUnlockAsync():
// 一般情况下的解锁 @Override public void unlock() { try { get(unlockAsync(Thread.currentThread().getId())); } catch (RedisException e) { // IllegalMonitorStateException一般是A线程加锁,B线程解锁,内部判断线程状态不一致抛出的 if (e.getCause() instanceof IllegalMonitorStateException) { throw (IllegalMonitorStateException) e.getCause(); } else { throw e; } } } @Override public RFuture这里列出一般情况下解锁和强制解锁的Lua脚本,分析如下:unlockAsync() { // 获取当前调用解锁操作的线程ID long threadId = Thread.currentThread().getId(); return unlockAsync(threadId); } @Override public RFuture unlockAsync(long threadId) { // 构建一个结果RedissonPromise RPromise result = new RedissonPromise (); // 返回的RFuture如果持有的结果为true,说明解锁成功,返回NULL说明线程ID异常,加锁和解锁的客户端线程不是同一个线程 RFuture future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> { // 这是内部的异常,说明解锁异常,需要取消看门狗的续期任务 if (e != null) { cancelExpirationRenewal(threadId); result.tryFailure(e); return; } // 这种情况说明线程ID异常,加锁和解锁的客户端线程不是同一个线程,抛出IllegalMonitorStateException异常 if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } // 走到这里说明正常解锁,取消看门狗的续期任务 cancelExpirationRenewal(threadId); result.trySuccess(null); }); return result; } // 真正的内部解锁的方法,执行解锁的Lua脚本 protected RFuture unlockInnerAsync(long threadId) { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.
-- unlockInnerAsync方法的lua脚本 -- KEYS[1] == getName() --> $KEY --> resource:x -- KEYS[2] == getChannelName() --> 订阅锁的Channel --> redisson_lock__channel:{resource:x} -- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量数值0 -- ARGV[2] == internalLockLeaseTime --> 30000或者具体的锁最大持有时间 -- ARGV[3] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1 -- 第一个IF分支判断如果锁所在的哈希的field不存在,说明当前线程ID未曾获取过对应的锁,返回NULL表示解锁失败 if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end; -- 走到这里通过hincrby进行线程重入计数-1,返回计数值 local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); -- 计数值大于0,说明线程重入加锁,这个时候基于internalLockLeaseTime对锁所在KEY进行续期 if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else -- 计数值小于或等于0,说明可以解锁,删除锁所在的KEY,并且向redisson_lock__channel:{$KEY}发布消息,内容是0(常量数值) redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; -- 最后的return nil;在IDEA中提示是不会到达的语句,估计这里是开发者笔误写上去的,前面的if-else都有返回语句,这里应该是不可达的 return nil; -------------------------------------------------- 不怎么华丽的分割线 ------------------------------------------------- -- forceUnlockAsync方法的lua脚本 -- KEYS[1] == getName() --> $KEY --> resource:x -- KEYS[2] == getChannelName() --> 订阅锁的Channel --> redisson_lock__channel:{resource:x} -- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量数值0 -- 强制删除锁所在的KEY,如果删除成功向redisson_lock__channel:{$KEY}发布消息,内容是0(常量数值) if (redis.call('del', KEYS[1]) == 1) then redis.call('publish', KEYS[2], ARGV[1]); return 1 else return 0 end其他辅助方法都相对简单,这里弄个简单的"流水账"记录一番:
- isLocked():基于getName()调用Redis的EXISTS $KEY命令判断是否加锁
- isHeldByThread(long threadId)和isHeldByCurrentThread():基于getName()和getLockName(threadId)调用Redis的HEXISTS $KEY $LOCK_NAME命令判断HASH中对应的field-value是否存在,存在则说明锁被对应线程ID的线程持有
- getHoldCount():基于getName()和getLockName(threadId)调用Redis的HGET $KEY $LOCK_NAME命令,用于获取线程对于某一个锁的持有量(注释叫holds,其实就是同一个线程对某一个锁的KEY的续期次数)
- 不带waitTime参数的加锁流程:
- 带有waitTime参数的加锁流程(图右边的流程基本不变,主要是左边的流程每一步都要计算时间间隔):
- 解锁流程:
- KEY代表的就是资源或者锁,创建、存在性判断,延长生存周期和删除操作总是针对KEY进行的
- FIELD代表的是锁名称lockName(),但是其实它由Redisson连接管理器实例的初始化UUID拼接客户端线程ID组成,严格来说应该是获取锁的客户端线程唯一标识
- VALUE代表的是客户端线程对于锁的持有量,从源码上看应该是KEY被续期的次数
小结
Redisson中的red lock实现,应用到下面的核心技术:
- 合理应用Redis的基本数据类型HASH
- Redis的订阅发布
- Lua脚本的原子性
- Netty中的Promise实现
- Netty中的时间轮HashedWheelTimer和对应的定时任务(HashedWheel)Timeout
- Semaphore进行带期限、永久或者可中断的阻塞以及唤醒,替代CountDownLatch中的无等待期限阻塞