Redisson分布式锁之读锁RedissonReadLock原理


1、基本配置

配置信息与  配置一样,可自行查看

2、使用&读写锁介绍

 // 获取key为"rwLock"的锁对象,此时获取到的对象是 RReadWriteLock
 RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock");

 RLock lock = rwLock.readLock();  // 获取读锁read
 // or
 RLock lock = rwLock.writeLock(); // 获取写锁write
 // 2、加锁
 lock.lock();
 try {
   // 进行具体的业务操作
   ...
 } finally {
   // 3、释放锁
   lock.unlock();
 }

读写锁的特性:

  • 读读兼容、读写互斥、写写互斥、写读互斥

  • 锁可以降级(当线程先获取到写锁,然后再去获取读锁,接着再释放写锁),但不能升级(先获取读锁,然后再获取写锁,再释放读锁)

为什么可以降级锁,而不能升级锁:

  • 因为锁降级是从写锁降级为读锁,此时,同一时间拿到写锁的只有一个线程,可以直接降级为读锁,不会造成冲突;而升级锁是从读锁升级为写锁,此时,同一时间拿到读锁的可能会有多个线程(读读不互斥),会造成冲突

同RedissonFairLock一样,RReadWriteLock也是RedissonLock的子类 ,主要也是基于 RedissonLock 做的扩展,主要扩展在于加锁和释放锁的地方,以及读锁的 wathcdog lua 脚本(经过重写的),其他的逻辑都直接复用 RedissonLock

3、RedissonReadLock 

3.1 lua脚本加锁

RedissonReadLock#tryLockInnerAsync

 RFuture tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
              // 获取锁模式  hget rwLock mode
              "local mode = redis.call('hget', KEYS[1], 'mode'); " +
              // 锁的模式为空,即当前锁尚未有线程获取
              "if (mode == false) then " +
                // 利用 hset 命令设置锁模式为读锁  hset rwLock mode read
                "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                // 利用 hset 命令为当前线程添加加锁次数记录 hset rwLock UUID:threadId 1
                "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                // 利用 set 命令为当前获取到锁的线程添加一条超时记录 String类型
                // set {rwLock}:UUID:threadId:rwlock_timeout:1 1
                "redis.call('set', KEYS[2] .. ':1', 1); " +
                // 利用 pexpire 命令为锁&当前线程超时记录 添加过期时间
                // pexpire {rwLock}:UUID:threadId:rwlock_timeout:1 30000
                "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
                // pexpire rwLock 30000
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
              "end; " +
              // 锁模式为读锁 或 锁模式为写锁并且获取写锁的为当前线程   hexists rwLock UUID:threadId:write
              "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                // 利用 hincrby 命令为当前线程增加加锁次数,并返回当前值  hincrby rwLock UUID:threadId 1
                "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                // 为当前线程拼接加锁记录 key
                // key = {rwLock}:UUID:threadId:rwlock_timeout:ind(加锁的次数)
                "local key = KEYS[2] .. ':' .. ind;" +
                // 利用 set 命令为 key 添加一条加锁超时记录,并设置过期时间内
                "redis.call('set', key, 1); " +
                "redis.call('pexpire', key, ARGV[1]); " +
                // 获取锁过期的时间
                "local remainTime = redis.call('pttl', KEYS[1]); " +
                // ttl 和 30000 中选出最大值,设置为锁的过期时间
                "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
                "return nil; " +
              "end;" +
              //  返回锁的过期时间
              "return redis.call('pttl', KEYS[1]);",
          Arrays.asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)),
          unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId));
}

参数说明:

 KEYS = Collections.singletonList(getRawName(), getReadWriteTimeoutNamePrefix(threadId))

  • KEYS[1]:getRawName(),就是key的名称,也就是获取锁对象时设置的"rwLock"

  • KEYS[2]:getReadWriteTimeoutNamePrefix(threadId)),锁超时key,即{rwLock}:UUID:threadId:rwlock_timeout

 ARGV = unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId)

  • ARGV[1]:unit.toMillis(leaseTime),锁过期的时间,默认30s

  • ARGV[2]:getLockName(threadId),UUID:ThreadId,当前线程,UUID来唯一标识一个客户端

  • ARGV[3]:getWriteLockName(threadId),写锁名称,UUID:threadId:write

首次加读锁

也就是执行lua脚本的第一个分支,Redis中的数据有一个key为rwLock结构的Hash锁,包含锁的模式,以及加锁的线程

  一个以当前加锁线程的超时时间(String类型)

读锁重入

执行第二个分支

  • 锁模式为读锁,当前线程可获取读锁。即:redisson提供的读写锁支持不同线程重复获取锁
  • 锁模式为写锁,并且获取写锁的线程为当前线程,当前线程可获取读锁;即:redisson 提供的读写锁,读写并不是完全互斥,而是支持同一线程先获取写锁再获取读锁,也就是 锁的降级

关于写锁判断,到分析获取写锁的lua脚本时再回头看;但是可以从这里提前知道,如果为写锁添加加锁次数记录,使用的 key 是 UUID:threadId:write,而读锁使用的 key 是 UUID:threadId

此时Redis中Hash结构的数据中,当前线程的的值加1,表示重入次数

并且在Redis中会再增加一条String类型的数据,表示第二次加锁的超时时间,可以看到,当一个线程重入n次时,就会有n条对应的超时记录,并且key最后的数字是依次递增的

读读支持

也是执行第二个分支,此时Hash结构的数据中,存储锁的模式,获取到锁的线程

以及String类型的线程的超时时间

写读互斥

已经加了读锁,此时写锁进来,不满足第一部分,也不满足第二部分,直接返回当前锁的过期时间,并订阅消息通道 redisson_rwlock:{rwLock},然后就会在while(true)中进行自旋等待锁的释放

至此,整个加锁的流程完成,从上面可以看出,在读锁的时候:

  1. 锁 rwLock 是哈希表结构的

  2. 加锁时,会对哈希表设置 mode 字段来表示这个锁是读锁还是写锁,mode = read 表示读锁

  3. 加锁时,会对哈希表设置当前线程 rwLock 的 UUID:ThreadId 字段,值表示重入次数

  4. 每次加锁,会维护一个 key 表示这次锁的超时时间,这个 key 的结构是 {锁名字}:UUID:ThreadId:rwlock_timeout:重入次数

3.2 watchdog续期lua脚本 

RedissonReadLock#renewExpirationAsync

protected RFuture renewExpirationAsync(long threadId) {
    // {rwLock}:UUID:threadId:rwlock_timeout
    String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
    // timeoutPrefix.split(":" + getLockName(threadId))[0] -> {rwLock}
    String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);
    
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
          // 利用 hget 命令获取当前当前线程的加锁次数  hget rwLock UUID:threadId
          "local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
          "if (counter ~= false) then " +
              // 当前线程获取锁次数大于0,刷新锁过期时间  pexpire rwLock 30000
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
              // 利用 hlen 命令获取锁集合里面的元素个数,然后判断是否大于1个以上key  hlen rwLock
              "if (redis.call('hlen', KEYS[1]) > 1) then " +
                 // 如果锁集合里面key大于1个,获取锁集合中的所有key  hkeys rwLock
                 "local keys = redis.call('hkeys', KEYS[1]); " +
                 // 遍历每一个key
                 "for n, key in ipairs(keys) do " +
                    // hegt rwLock key 获取其具体值
                    "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                    // 如果值为数字类型,证明此key是加锁成功的线程,其值表示线程加锁的次数
                    "if type(counter) == 'number' then " + 
                        // 遍历加锁次数,刷新加锁线程对应的过期时间
                        "for i=counter, 1, -1 do " +
                            // pexpire {rwLock}:key:rwlock_timeout:i 30000
                            "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + 
                        "end; " + 
                    "end; " + 
                "end; " +
            "end; " +
            "return 1; " +
        "end; " +
        "return 0;",
    Arrays.asList(getRawName(), keyPrefix),
    internalLockLeaseTime, getLockName(threadId));
}

参数说明 

KEYS = Arrays.asList(getRawName(), keyPrefix)

  • KEYS[1]:getRawName(),就是key的名称,也就是获取锁对象时设置的"rwLock"

  • KEYS[2]:keyPrefix,{rwLock}

ARGV = internalLockLeaseTime, getLockName(threadId)

  • ARGV[1]:internalLockLeaseTime,锁过期时间,其实就是watchdog超时时间,默认 30*1000 ms

  • ARGV[2]:getLockName(threadId),UUID:ThreadId,UUID来唯一标识一个客户端

在上述续期的lua脚本中有一个 hlen KEYS[1](hlen rwLock) 的判断,做这个判断是因为 读写锁 集合中,包含2个以上的键值对,其中一个就是锁模式,也就是mode字段,来表示当前锁是读锁还是写锁;后面的操作获取锁集合中所有的key:hkeys KEYS[1](hkeys rwLock),遍历所有的key,并获取其值:hget KEYS[1]key(hget rwLock key),如果key的值为数字,证明此key是加锁成功的线程,并且value的值表示线程加锁次数;遍历加锁次数利用 pexpire 为这个线程对应的加锁记录刷新过期时间

之所以遍历加锁次数,是因为在锁重入的时候,每成功加锁一次,redisson 都会为当前线程新增一条加锁记录,并且设置过期时间。

3.3 lua脚本释放锁 

RedissonReadLock#unlockInnerAsync

protected RFuture unlockInnerAsync(long threadId) {
    // {myLock}:UUID:threadId:rwlock_timeout
    String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId);
    // timeoutPrefix.split(":" + getLockName(threadId))[0] -> {myLock}
    String keyPrefix = getKeyPrefix(threadId, timeoutPrefix);

    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
         // 利用 hget 命令获取读写锁的模式
        "local mode = redis.call('hget', KEYS[1], 'mode'); " +
         // 如果锁模式为空,往读写锁对应的channel发送释放锁的消息
        "if (mode == false) then " +
            // publish redisson_rwlock:{rwLock} 0
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
        "end; " +
        // 利用 hexists 命令判断当前线程是否持有锁  hexists rwLock UUID:threadId
        "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
        "if (lockExists == 0) then " +
            "return nil;" +
        "end; " +
        // 利用 hincrby 命令,给当前线程持有锁数量减1   hincrby rwLock UUID:threadId -1
        "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
        // 如果持有锁数量减1后等于0,证明当前线程不再持有锁,那么利用 hdel 命令将锁map中加锁次数记录删掉
        "if (counter == 0) then " +
            // hdel rwLock UUID:threadId
            "redis.call('hdel', KEYS[1], ARGV[2]); " + 
        "end;" +
        // 删除线程持有锁对应的加锁超时记录   del {rwLock}:UUID:threadId:rwlock_timeout:count+1
        "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +

        // 利用 hlen 获取锁map中key的数量  hlen rwLock
        "if (redis.call('hlen', KEYS[1]) > 1) then " +
            "local maxRemainTime = -3; " +
            // 获取锁map中 key 的数量 hkeys rwLock
            "local keys = redis.call('hkeys', KEYS[1]); " + 
            "for n, key in ipairs(keys) do " +
                // 获取锁map中key的值  hget rwLock key(遍历的key)
                "counter = tonumber(redis.call('hget', KEYS[1], key)); " +
                // 如果值为数字
                "if type(counter) == 'number' then " + 
                    // 遍历加锁次数,刷新加锁线程对应的过期时间
                    "for i=counter, 1, -1 do " +
                       // 利用 pttl 获取超时时间 pptl {rwLock}:key:rwlock_timeout:i
                       "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                       // 与 maxRemainTime 对比,将最大值,赋值给 maxRemainTime
                       "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                    "end; " + 
                "end; " + 
            "end; " +
            // 判断 maxRemainTime 是否大于0,如果大于0,给锁重新设置过期时间为 maxRemainTime,然后返回0结束lua脚本的执行      
            "if maxRemainTime > 0 then " +
                // pexpire rwLock maxRemainTime
                "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                "return 0; " +
            "end;" + 
            // 如果当前读写锁的锁模式是写锁,直接返回0结束lua脚本的执行
            "if mode == 'write' then " + 
                "return 0;" + 
            "end; " +
        "end; " +
        // 当走到最后的操作,证明当前线程不但成功释放锁,并且释放后当前读写锁已经没有其他线程再持有锁了
        // 直接将读写锁对应的key直接删掉,并且往读写锁对应的channel中发布释放锁消息  
        // del rwLock
        "redis.call('del', KEYS[1]); " +
        // publish redisson_rwlock:{rwLock} 0
        "redis.call('publish', KEYS[2], ARGV[1]); " +
        "return 1; ",
    Arrays.asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix),
    LockPubSub.UNLOCK_MESSAGE, getLockName(threadId));
}

 参数说明

 KEYS = Arrays.asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix)

  • KEYS[1]:getRawName(),就是key的名称,也就是获取锁对象时设置的"rwLock"

  • KEYS[2]:getChannelName(),订阅消息的通道,redisson_rwlock:{rwLock}

  • KEYS[3]:timeoutPrefix,{rwLock}:UUID:threadId:rwlock_timeout

  • KEYS[4]:keyPrefix,{rwLock}

ARGV = LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)

  • ARGV[1]:LockPubSub.UNLOCK_MESSAGE,Redis发布事件时的message,为 0

  • ARGV[2]:getLockName(threadId),UUID:ThreadId,UUID来唯一标识一个客户端

到这里,整个读锁的流程全部结束,但是有两个小小的疑问?

为什么给读锁扣减不需要先判断锁的模式?

  • 在锁map中记录加锁次数时,读锁的key是UUID:threadId,而写锁的key是UUID:threadId:write,那么就是说读锁的key和写锁的key是不一样的。所以解锁的时候,直接使用对应key来扣减持有锁次数即可。

  • 相同线程,如果获取了写锁后,还是可以继续获取读锁的。所以只需要判断锁map有读锁加锁次数记录即可,就可以判断当前线程是持有读锁的,并不需要关心当前锁的模式。

为什么锁map中的key都大于1了,证明肯定还有线程持有锁,那为什么还会存在 maxRemainTime 最后小于0的情况呢?

  • 有一个点我们还没学到,那就是其实读写锁中,如果是获取写锁,并不会新增一条写锁的超时记录,因为读写锁中,写锁和写锁是互斥的,写锁和读锁也是互斥的,即使支持当前线程先获取写锁再获取读锁,其实也不需要增加一条写锁的超时时间,因为读写锁 key 的超时时间就等于写锁的超时时间。