Redisson分布式锁之读锁RedissonReadLock原理
1、基本配置
配置信息与 配置一样,可自行查看
2、使用&读写锁介绍
// 获取key为"rwLock"的锁对象,此时获取到的对象是 RReadWriteLock RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock"); RLock lock = rwLock.readLock(); // 获取读锁read // or RLock lock = rwLock.writeLock(); // 获取写锁write // 2、加锁 lock.lock(); try { // 进行具体的业务操作 ... } finally { // 3、释放锁 lock.unlock(); }
读写锁的特性:
-
读读兼容、读写互斥、写写互斥、写读互斥
-
锁可以降级(当线程先获取到写锁,然后再去获取读锁,接着再释放写锁),但不能升级(先获取读锁,然后再获取写锁,再释放读锁)
为什么可以降级锁,而不能升级锁:
- 因为锁降级是从写锁降级为读锁,此时,同一时间拿到写锁的只有一个线程,可以直接降级为读锁,不会造成冲突;而升级锁是从读锁升级为写锁,此时,同一时间拿到读锁的可能会有多个线程(读读不互斥),会造成冲突
同RedissonFairLock一样,RReadWriteLock也是RedissonLock的子类 ,主要也是基于 RedissonLock 做的扩展,主要扩展在于加锁和释放锁的地方,以及读锁的 wathcdog lua 脚本(经过重写的),其他的逻辑都直接复用 RedissonLock
3、RedissonReadLock
3.1 lua脚本加锁
RedissonReadLock#tryLockInnerAsync
RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
// 获取锁模式 hget rwLock mode
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
// 锁的模式为空,即当前锁尚未有线程获取
"if (mode == false) then " +
// 利用 hset 命令设置锁模式为读锁 hset rwLock mode read
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
// 利用 hset 命令为当前线程添加加锁次数记录 hset rwLock UUID:threadId 1
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 利用 set 命令为当前获取到锁的线程添加一条超时记录 String类型
// set {rwLock}:UUID:threadId:rwlock_timeout:1 1
"redis.call('set', KEYS[2] .. ':1', 1); " +
// 利用 pexpire 命令为锁&当前线程超时记录 添加过期时间
// pexpire {rwLock}:UUID:threadId:rwlock_timeout:1 30000
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
// pexpire rwLock 30000
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 锁模式为读锁 或 锁模式为写锁并且获取写锁的为当前线程 hexists rwLock UUID:threadId:write
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
// 利用 hincrby 命令为当前线程增加加锁次数,并返回当前值 hincrby rwLock UUID:threadId 1
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 为当前线程拼接加锁记录 key
// key = {rwLock}:UUID:threadId:rwlock_timeout:ind(加锁的次数)
"local key = KEYS[2] .. ':' .. ind;" +
// 利用 set 命令为 key 添加一条加锁超时记录,并设置过期时间内
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
// 获取锁过期的时间
"local remainTime = redis.call('pttl', KEYS[1]); " +
// ttl 和 30000 中选出最大值,设置为锁的过期时间
"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
"return nil; " +
"end;" +
// 返回锁的过期时间
"return redis.call('pttl', KEYS[1]);",
Arrays.asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),
unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId));
}
参数说明:
KEYS = Collections.singletonList(getRawName(), getReadWriteTimeoutNamePrefix(threadId))
KEYS[1]:getRawName(),就是key的名称,也就是获取锁对象时设置的"rwLock"
KEYS[2]:getReadWriteTimeoutNamePrefix(threadId)),锁超时key,即{rwLock}:UUID:threadId:rwlock_timeout
ARGV = unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId)
ARGV[1]:unit.toMillis(leaseTime),锁过期的时间,默认30s
ARGV[2]:getLockName(threadId),UUID:ThreadId,当前线程,UUID来唯一标识一个客户端
ARGV[3]:getWriteLockName(threadId),写锁名称,UUID:threadId:write
首次加读锁
也就是执行lua脚本的第一个分支,Redis中的数据有一个key为rwLock结构的Hash锁,包含锁的模式,以及加锁的线程
一个以当前加锁线程的超时时间(String类型)
读锁重入
执行第二个分支
- 锁模式为读锁,当前线程可获取读锁。即:redisson提供的读写锁支持不同线程重复获取锁
- 锁模式为写锁,并且获取写锁的线程为当前线程,当前线程可获取读锁;即:redisson 提供的读写锁,读写并不是完全互斥,而是支持同一线程先获取写锁再获取读锁,也就是 锁的降级
关于写锁判断,到分析获取写锁的lua脚本时再回头看;但是可以从这里提前知道,如果为写锁添加加锁次数记录,使用的 key 是 UUID:threadId:write,而读锁使用的 key 是 UUID:threadId
此时Redis中Hash结构的数据中,当前线程的的值加1,表示重入次数
并且在Redis中会再增加一条String类型的数据,表示第二次加锁的超时时间,可以看到,当一个线程重入n次时,就会有n条对应的超时记录,并且key最后的数字是依次递增的
读读支持
也是执行第二个分支,此时Hash结构的数据中,存储锁的模式,获取到锁的线程
以及String类型的线程的超时时间
写读互斥
已经加了读锁,此时写锁进来,不满足第一部分,也不满足第二部分,直接返回当前锁的过期时间,并订阅消息通道 redisson_rwlock:{rwLock},然后就会在while(true)中进行自旋等待锁的释放
至此,整个加锁的流程完成,从上面可以看出,在读锁的时候:
-
锁 rwLock 是哈希表结构的
-
加锁时,会对哈希表设置 mode 字段来表示这个锁是读锁还是写锁,mode = read 表示读锁
-
加锁时,会对哈希表设置当前线程 rwLock 的 UUID:ThreadId 字段,值表示重入次数
-
每次加锁,会维护一个 key 表示这次锁的超时时间,这个 key 的结构是 {锁名字}:UUID:ThreadId:rwlock_timeout:重入次数
3.2 watchdog续期lua脚本
RedissonReadLock#renewExpirationAsync
protected RFuture renewExpirationAsync(long threadId) {
// {rwLock}:UUID:threadId:rwlock_timeout
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
// timeoutPrefix.split(":" + getLockName(threadId))[0] -> {rwLock}
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 利用 hget 命令获取当前当前线程的加锁次数 hget rwLock UUID:threadId
"local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
"if (counter ~= false) then " +
// 当前线程获取锁次数大于0,刷新锁过期时间 pexpire rwLock 30000
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
// 利用 hlen 命令获取锁集合里面的元素个数,然后判断是否大于1个以上key hlen rwLock
"if (redis.call('hlen', KEYS[1]) > 1) then " +
// 如果锁集合里面key大于1个,获取锁集合中的所有key hkeys rwLock
"local keys = redis.call('hkeys', KEYS[1]); " +
// 遍历每一个key
"for n, key in ipairs(keys) do " +
// hegt rwLock key 获取其具体值
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
// 如果值为数字类型,证明此key是加锁成功的线程,其值表示线程加锁的次数
"if type(counter) == 'number' then " +
// 遍历加锁次数,刷新加锁线程对应的过期时间
"for i=counter, 1, -1 do " +
// pexpire {rwLock}:key:rwlock_timeout:i 30000
"redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " +
"end; " +
"end; " +
"end; " +
"end; " +
"return 1; " +
"end; " +
"return 0;",
Arrays.
参数说明
KEYS = Arrays.asList(getRawName(), keyPrefix)
KEYS[1]:getRawName(),就是key的名称,也就是获取锁对象时设置的"rwLock"
KEYS[2]:keyPrefix,{rwLock}
ARGV = internalLockLeaseTime, getLockName(threadId)
ARGV[1]:internalLockLeaseTime,锁过期时间,其实就是watchdog超时时间,默认 30*1000 ms
ARGV[2]:getLockName(threadId),UUID:ThreadId,UUID来唯一标识一个客户端
在上述续期的lua脚本中有一个 hlen KEYS[1](hlen rwLock) 的判断,做这个判断是因为 读写锁 集合中,包含2个以上的键值对,其中一个就是锁模式,也就是mode字段,来表示当前锁是读锁还是写锁;后面的操作获取锁集合中所有的key:hkeys KEYS[1](hkeys rwLock),遍历所有的key,并获取其值:hget KEYS[1]key(hget rwLock key),如果key的值为数字,证明此key是加锁成功的线程,并且value的值表示线程加锁次数;遍历加锁次数利用 pexpire 为这个线程对应的加锁记录刷新过期时间
之所以遍历加锁次数,是因为在锁重入的时候,每成功加锁一次,redisson 都会为当前线程新增一条加锁记录,并且设置过期时间。
3.3 lua脚本释放锁
RedissonReadLock#unlockInnerAsync
protected RFuture unlockInnerAsync(long threadId) {
// {myLock}:UUID:threadId:rwlock_timeout
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
// timeoutPrefix.split(":" + getLockName(threadId))[0] -> {myLock}
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 利用 hget 命令获取读写锁的模式
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
// 如果锁模式为空,往读写锁对应的channel发送释放锁的消息
"if (mode == false) then " +
// publish redisson_rwlock:{rwLock} 0
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
// 利用 hexists 命令判断当前线程是否持有锁 hexists rwLock UUID:threadId
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
// 利用 hincrby 命令,给当前线程持有锁数量减1 hincrby rwLock UUID:threadId -1
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
// 如果持有锁数量减1后等于0,证明当前线程不再持有锁,那么利用 hdel 命令将锁map中加锁次数记录删掉
"if (counter == 0) then " +
// hdel rwLock UUID:threadId
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
// 删除线程持有锁对应的加锁超时记录 del {rwLock}:UUID:threadId:rwlock_timeout:count+1
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
// 利用 hlen 获取锁map中key的数量 hlen rwLock
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
// 获取锁map中 key 的数量 hkeys rwLock
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
// 获取锁map中key的值 hget rwLock key(遍历的key)
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
// 如果值为数字
"if type(counter) == 'number' then " +
// 遍历加锁次数,刷新加锁线程对应的过期时间
"for i=counter, 1, -1 do " +
// 利用 pttl 获取超时时间 pptl {rwLock}:key:rwlock_timeout:i
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
// 与 maxRemainTime 对比,将最大值,赋值给 maxRemainTime
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
// 判断 maxRemainTime 是否大于0,如果大于0,给锁重新设置过期时间为 maxRemainTime,然后返回0结束lua脚本的执行
"if maxRemainTime > 0 then " +
// pexpire rwLock maxRemainTime
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
// 如果当前读写锁的锁模式是写锁,直接返回0结束lua脚本的执行
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; " +
// 当走到最后的操作,证明当前线程不但成功释放锁,并且释放后当前读写锁已经没有其他线程再持有锁了
// 直接将读写锁对应的key直接删掉,并且往读写锁对应的channel中发布释放锁消息
// del rwLock
"redis.call('del', KEYS[1]); " +
// publish redisson_rwlock:{rwLock} 0
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ",
Arrays.
参数说明
KEYS = Arrays.asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix)
KEYS[1]:getRawName(),就是key的名称,也就是获取锁对象时设置的"rwLock"
KEYS[2]:getChannelName(),订阅消息的通道,redisson_rwlock:{rwLock}
KEYS[3]:timeoutPrefix,{rwLock}:UUID:threadId:rwlock_timeout
KEYS[4]:keyPrefix,{rwLock}
ARGV = LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)
ARGV[1]:LockPubSub.UNLOCK_MESSAGE,Redis发布事件时的message,为 0
ARGV[2]:getLockName(threadId),UUID:ThreadId,UUID来唯一标识一个客户端
到这里,整个读锁的流程全部结束,但是有两个小小的疑问?
为什么给读锁扣减不需要先判断锁的模式?
在锁map中记录加锁次数时,读锁的key是UUID:threadId,而写锁的key是UUID:threadId:write,那么就是说读锁的key和写锁的key是不一样的。所以解锁的时候,直接使用对应key来扣减持有锁次数即可。
相同线程,如果获取了写锁后,还是可以继续获取读锁的。所以只需要判断锁map有读锁加锁次数记录即可,就可以判断当前线程是持有读锁的,并不需要关心当前锁的模式。
为什么锁map中的key都大于1了,证明肯定还有线程持有锁,那为什么还会存在 maxRemainTime 最后小于0的情况呢?
有一个点我们还没学到,那就是其实读写锁中,如果是获取写锁,并不会新增一条写锁的超时记录,因为读写锁中,写锁和写锁是互斥的,写锁和读锁也是互斥的,即使支持当前线程先获取写锁再获取读锁,其实也不需要增加一条写锁的超时时间,因为读写锁 key 的超时时间就等于写锁的超时时间。