Redisson分布式锁之写锁RedissonWriteLock原理
1、基本配置&使用
基本配置及使用,可在 自行查看
2、RedissonWriteLock
2.1 lua脚本加锁
// 获取key为"rwLock"的锁对象
RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock");
RedissonWriteLock#tryLockInnerAsync
RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
// 获取锁模式 hget rwLock mode
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
// 锁的模式为空,即当前锁尚未有线程获取
"if (mode == false) then " +
// 利用 hset 命令设置锁模式为写锁 hset rwLock mode write
"redis.call('hset', KEYS[1], 'mode', 'write'); " +
// 利用 hset 命令为当前线程添加加锁次数记录 hset rwLock UUID:threadId:write 1
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 设置锁过期时间 pexpire rwLock 30000
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 锁的模式为写锁
"if (mode == 'write') then " +
// 利用 hexists 命令判断持有写锁是否为当前线程 hexists rwLock UUID:threadId:write
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 如果是,利用 hincrby 命令为当前线程增加1次加锁次数 (支持相同线程可重入获取锁)
// hincrby rwLock UUID:threadId:write 1
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 利用 pttl 获取当前写锁的超时剩余毫秒数 pttl rwLock
"local currentExpire = redis.call('pttl', KEYS[1]); " +
// 利用 pexipre 给锁重新设置锁的过期时间,过期时间为:上次加锁的剩余毫秒数+30000毫秒
// pexpire rwLock currentExpire + 30000
"redis.call('pexpire', KEYS[1], currentExpire + ARGV[1]); " +
"return nil; " +
"end; " +
"end;" +
// 返回当前锁过期时间
"return redis.call('pttl', KEYS[1]);",
Arrays.
参数说明
KEYS = Arrays.asList(getRawName())
KEYS[1]:getRawName(),就是key的名称,也就是获取锁对象时设置的"rwLock"
ARGV = unit.toMillis(leaseTime), getLockName(threadId)
ARGV[2]:internalLockLeaseTime,watchdog的超时时间,30*1000 ms
ARGV[3]:getLockName(threadId),super.getLockName(threadId) + ":write" -> UUID:ThreadId:write
首次加写锁
也就是执行lua脚本的第一个分支,Redis中的数据有一个key为rwLock结构的Hash锁,包含锁的模式,以及加锁的线程,但是跟读锁不同的是,此时加锁的线程为: UUID:threadId:write,并且是不会就超时记录的,因为写锁在同一时间只有一个线程能够获取(写写互斥),锁的超时时间就是线程持有锁的超时时间,所以不需要
写锁重入
执行第二个分支
-
锁模式为写锁并且持有写锁为当前线程,当前线程可再次获取写锁
-
新的过期时间为已经持有锁的过期时间 + 30000ms,currentExpire + ARGV[1]
此时Redis中Hash结构的数据中,当前线程的的值加1,表示重入次数,并且此时的过期时间是在已持有锁的过期时间 + 30s
写写互斥、写读互斥
已经加了写锁,此时不管写锁还是读锁进来,不满足第一部分,也不满足第二部分,直接返回当前锁的过期时间,并订阅消息通道 redisson_rwlock:{rwLock},然后就会在while(true)中进行自旋等待锁的释放
至此,整个加锁的流程完成,从上面可以看出,在读锁的时候:
-
锁 rwLock 是哈希表结构的
-
加锁时,会对哈希表设置 mode 字段来表示这个锁是读锁还是写锁,mode = write 表示读锁
-
在 rwLock 中再额外维护一个字段 UUID:ThreadId:write 表示重入次数
2.2 watchdog续期lua脚本
watchdog 的执行操作,还是和 RedissLock 保持一致
2.3 lua脚本释放锁
RedissonWriteLock#unlockInnerAsync
protected RFuture unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 利用 hget 命令获取锁的模式 hget rwLock mode
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
// 如果锁模式为空,往读写锁对应的channel发送释放锁的消息
"if (mode == false) then " +
// publish redisson_rwlock:{rwLock} 0
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 如果当前锁的模式为写锁
"if (mode == 'write') then " +
// 利用 hexists 命令判断当前线程是否持有锁 hexists rwLock UUID:threadId:write
"local lockExists = redis.call('hexists', KEYS[1], ARGV[3]); " +
// 如果不存在直接返回null,表示释放锁失败
"if (lockExists == 0) then " +
"return nil;" +
"else " +
// 如果锁存在, 利用 hincrby 命令扣减持有锁数量 hincrby rwLock UUID:threadId:write -1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果剩余持有锁数量大于0,利用 pexpire 命令重新刷新锁过期时间 pexpire rwLock 30000
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 如果剩余持有锁数量为0,利用 del 命令删除写锁记录 hdel rwLock UUID:threadId:write
"redis.call('hdel', KEYS[1], ARGV[3]); " +
// 删除写锁记录后,利用 hlen 判断锁map里还存在多少个key hlen rwLock
"if (redis.call('hlen', KEYS[1]) == 1) then " +
// 如果 key 数量等于1,证明当前线程不再持有任何锁,
// 利用 del 命令删除锁map, del rwLock
"redis.call('del', KEYS[1]); " +
// 利用 publish 命令发布释放锁消息, public redisson_rwlock:{rwLock} 0
"redis.call('publish', KEYS[2], ARGV[1]); " +
"else " +
// has unlocked read-locks
// 如果 key 数量大于1,证明当前线程还持有读锁(锁的降级),利用 hset 命令将锁模式设置为读锁
// hset rwLock mode read
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
"end; " +
"return 1; "+
"end; " +
"end; " +
"end; " +
"return nil;",
Arrays.
参数说明
KEYS = Arrays.asList(getRawName(), getChannelName())
KEYS[1]:getRawName(),就是key的名称,也就是获取锁对象时设置的"rwLock"
KEYS[2]:getChannelName(),订阅消息的通道,redisson_rwlock:{rwLock}
ARGV = LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)
ARGV[1]:LockPubSub.UNLOCK_MESSAGE,Redis发布事件时的message,为 0
ARGV[2]:internalLockLeaseTime,watchdog的超时时间,30*1000 ms
ARGV[3]:getLockName(threadId),super.getLockName(threadId) + ":write" -> UUID:ThreadId:write
TIPS:
以上释放锁的lua脚本,其中有一段逻辑,删除写锁记录之后(hdel rwLock UUID:threadId:write),会判断当前锁集合中含有key的个数(hlen rwLock),如果key的个数大于1,则会将锁模式设置为读锁(hset rwLock mode read),这种情况是:当前线程不但持有写锁,还持有读锁;如果持有读锁,那么在释放写锁后,需要设置锁模式为读锁,也就是进行了锁的降级操作
public void lockDegrade() { // 获取写锁 writeLock.lock(); // do somrthing ... // 获取锁(开始锁降级) readLock.lock(); // 锁降级完成 writeLock.unlock(); // 降级结束 readLock.unlock(); }
回到RedissonReadLock加读锁的地方,第二段逻辑,就包含锁降级的操作,但必须是同一线程才能够进行锁降级的操作
" if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " ...
此时Redis中的数据:有一个Hash结构的锁,以及一个String类型的读锁超时记录
Hash结构的锁包含:锁的类型,写锁的线程,读锁的线程
当写锁线程释放(writeLock.unlock())后,此时Hash结构锁中的数据
之后会进行读锁相关的操作
3、读写锁的一些问题
1、降级锁时,释放写锁后,不再自动续期了
先获取写锁,在获取写锁成功后,会进行锁的续期,先看下锁的续期操作
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
// 如果传入key对应的value已经存在,就返回存在的value,不进行替换。如果不存在,就添加key和value,返回null
// getEntryName() -> uuid:key
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
// 如果不为null,说明是锁重入,当前线程对应的counter++,在释放锁的时候进行counter--操作
oldEntry.addThreadId(threadId);
} else {
// 一开始就是null,直接放入 EXPIRATION_RENEWAL_MAP 中,并给当前线程counter赋值1
entry.addThreadId(threadId);
// 调用定时任务
renewExpiration();
}
}
此时,
-
写锁获取成功后,第一次进来,因为 EXPIRATION_RENEWAL_MAP 中是不存在数据的,所以,执行else分支,进行锁续期的操作,此时续期的是写锁 UUID:threadId:write ,
-
当锁降级,也就是获取读锁的时候,也会进入到续期的操作中,此时 EXPIRATION_RENEWAL_MAP 已经存在,所以,执行if分支,只是将当前线程进行 counter++ 操作,所以,运行续期任务的仍旧是获取到写锁的线程
-
写锁释放的时候,会删除写锁的LockName,也就是UUID:threadId:write,然后进行锁续期任务的停止操作,首先会对当前线程进行 counter-- 操作,此时还不会停止续期任务,因为 counter-- 之后,还不是0,继续进行续期任务,当下次续期任务,会判断当前写锁线程是否存在,因为在写锁释放的时候,已经将其删除,所以,续期失败,将不会进行下次的续期任务
2、降级锁时,在释放写锁的时候,没有发布通知
-
如果在释放写锁之前,已经有其他线程在等待获取读锁,由上面释放写锁的lua脚本可知,在释放写锁后,并不会发布事件通知,那么其他等待获取读锁的线程不能马上获取到读锁,还会继续等待,直到本线程的读锁程序手动释放或超时自动释放(上面已经讲了,降级锁释放写锁后不会自动续期,所以会存在程序还没有手动释放读锁而超时自动释放的情况),其他线程才能获取到读锁