Redisson分布式锁之公平锁原理
1、基本配置
配置信息与 配置一样,可自行查看
2、使用
与非公平锁不同的是,公平锁获取所对象时,使用的是 getFairLock 方法,返回的对象为 RedissonFairLock
RedissonFairLock 其实是 RedissonLock 的子类,它主要是基于 RedissonLock 做的扩展,主要扩展在于加锁和释放锁的地方,其他的逻辑都直接复用 RedissonLock,例如加锁前计算slot、watchdog机制等等。
// 1、获取key为"fairLock"的锁对象
RLock lock = redissonClient.getFairLock("fairLock");
// 2、加锁
lock.lock();
try {
// 进行具体的业务操作
...
} finally {
// 3、释放锁
lock.unlock();
}
3、加锁
首先看下 RLock lock = redissonClient.getFairLock("fairLock") 进行了哪些操作
public RLock getFairLock(String name) {
// 初始一个RedissonFairLock对象,构造方法中初始一些参数信息
return new RedissonFairLock(commandExecutor, name);
}
public RedissonFairLock(CommandAsyncExecutor commandExecutor, String name, long threadWaitTime) {
super(commandExecutor, name);
this.commandExecutor = commandExecutor;
// 线程等待时间 60000 * 5
this.threadWaitTime = threadWaitTime;
// redis list类型的key,用于保存获取锁的线程,此时为 redisson_lock_queue:{fairLock}
this.threadsQueueName = prefixName("redisson_lock_queue", name);
// redis zset类型的key,用于保存预计获取到锁的时间点,此时为 redisson_lock_timeout:{fairLock}
this.timeoutSetName = prefixName("redisson_lock_timeout", name);
}
后续的操作跟 一致
着重看一下 RedissonFairLock#tryLockInnerAsync 方法,里面有两段 lua 脚本,根据代码流程只需要关注第二段即可
由于lua基本过长,所以针对每一个分支进行分析
3.1 参数说明
KEYS = Arrays.asList(getRawName(), threadsQueueName, timeoutSetName)
- KEYS[1]:getRawName(),就是key的名称,也就是获取锁对象时设置的"fairLock"
- KEYS[2]:threadsQueueName,获取锁等待队列名称,redisson_lock_queue:{fairLock}
- KEYS[3]:timeoutSetName,等待队列中线程预计获取锁时间的 set 集合,按照时间戳存放,redisson_lock_timeout:{fairLock}
ARGV = unit.toMillis(leaseTime), getLockName(threadId), wait, currentTime
- ARGV[1]:unit.toMillis(leaseTime),锁过期时间,其实就是watchdog超时时间,默认 30*1000 ms
- ARGV[2]:getLockName(threadId),UUID:ThreadId,UUID来唯一标识一个客户端
- ARGV[3]:wait,也就是threadWaitTime,默认 5*60000 ms
- ARGV[4]:currentTime,当前时间戳
3.2 lua脚本分析
分支一:清理过期的等待线程
// remove stale threads
// 这个死循环主要作用是删除失效的线程
"while true do " +
// 获取等待队列第一个元素,就是第一个等待线程
// lindex redisson_lock_queue:{fairLock} 0 返回值其实就是 UUID:threadId
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
// 如果为空,则跳出循环
"if firstThreadId2 == false then " +
"break;" +
"end;" +
// 获取第一个线程的分值(也就是超时时间) zscore redisson_lock_timeout:{fairLock} UUID:threadId
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
// 如果小于当期时间,说明已经失效,首先从超时集合中移除该节点,接着从等待队列中弹出第一个节点,否则退出循环
"if timeout <= tonumber(ARGV[4]) then " +
// remove the item from the queue and timeout set NOTE we do not alter any other timeout
// zrem redisson_lock_timeout:{fairLock} UUID:threadId
"redis.call('zrem', KEYS[3], firstThreadId2);" +
// lpop redisson_lock_queue:{fairLock}
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
场景:这个死循环的作用主要用于清理过期的等待线程,主要避免下面场景,避免无效客户端占用等待队列资源
-
获取锁失败,然后进入等待队列,但是网络出现问题,那么后续很有可能就不能继续正常获取锁了。
-
获取锁失败,然后进入等待队列,但是之后客户端所在服务器宕机了。
分支二:检查是否可成功获取锁
// check if the lock can be acquired now
// 如果当前锁为空,同时 (队列为空 或者 队列第一个线程是当前线程)
"if (redis.call('exists', KEYS[1]) == 0) " +
"and ((redis.call('exists', KEYS[2]) == 0) or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// remove this thread from the queue and timeout set
// 条件成立,则从队列和集合中删除当前线程
// lpop redisson_lock_queue:{myLock}:弹出等待队列中的第一个元素,即当前线程
// zrem redisson_lock_timeout:{myLock} UUID:threadId:从超时集合中移除当前客户端当前线程
"redis.call('lpop', KEYS[2]);" +
"redis.call('zrem', KEYS[3], ARGV[2]);" +
// decrease timeouts for all waiting in the queue
// 获取并遍历集合中所有的线程,每个线程的超时时间都减去5*60000ms
// zrange redisson_lock_timeout:{fairLock} 0 -1 从超时集合中获取所有的元素
"local keys = redis.call('zrange', KEYS[3], 0, -1);" +
"for i = 1, #keys, 1 do " +
// zincrby redisson_lock_timeout:{fairLock} -5*60000ms keys[i]
"redis.call('zincrby', KEYS[3], -tonumber(ARGV[3]), keys[i]);" +
"end;" +
// acquire the lock and set the TTL for the lease
// 当前线程获取到锁,并设置过期时间,默认是30s
// hset fairLock UUID:threadId 1
"redis.call('hset', KEYS[1], ARGV[2], 1);" +
// pexpire fairLock 30*1000ms
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
场景:
-
其他客户端刚释放锁,并且等待队列为空
-
其他客户端刚释放锁,并且等待队列中的第一个元素就是当前客户端当前线程
TIPS:上述lua脚本中,有一段当某个线程获取到锁,为什么要扣减等待时间?
因为这里的客户端都是调用 lock()方法,就是等待直到最后获取到锁;所以某个客户端可以成功获取锁的时候,要帮其他等待的客户端刷新一下等待时间,不然在分支一的死循环中就被干掉了。
分支三:当前线程曾经获取锁,重复获取锁
// 如果当前线程已经获取到锁,则value+1(可重入)。同时重新设置过期时间
"if redis.call('hexists', KEYS[1], ARGV[2]) == 1 then " +
"redis.call('hincrby', KEYS[1], ARGV[2],1);" +
"redis.call('pexpire', KEYS[1], ARGV[1]);" +
"return nil;" +
"end;" +
分支四:当前线程本就在等待队列中,返回等待时间
// the lock cannot be acquired, check if the thread is already in the queue
// 获取不到锁,如果当前线程已经在队列里面,返回 超时时间 - 等待时间(5*60000ms) - 当前时间
"local timeout = redis.call('zscore', KEYS[3], ARGV[2]);" +
"if timeout ~= false then " +
// the real timeout is the timeout of the prior thread in the queue, but this is approximately correct, and
// avoids having to traverse the queue
"return timeout - tonumber(ARGV[3]) - tonumber(ARGV[4]);" +
"end;" +
分支五:当前线程首次尝试获取锁,将当前线程加入到超时集合中,同时放入等待队列中
// add the thread to the queue at the end, and set its timeout in the timeout set to the timeout of // the prior thread in the queue (or the timeout of the lock if the queue is empty) plus the threadWaitTime // 获取到队列最后一个线程 // lindex redisson_lock_queue:{fairLock} -1 "local lastThreadId = redis.call('lindex', KEYS[2], -1);" + "local ttl;" + // 如果最后一个线程存在,且不是当前线程的话,ttl就等于最后一个线程的超时时间 - 当前时间,否则为fairLock的剩余生存时间 "if lastThreadId ~= false and lastThreadId ~= ARGV[2] then " + "ttl = tonumber(redis.call('zscore', KEYS[3], lastThreadId)) - tonumber(ARGV[4]);" + "else " + "ttl = redis.call('pttl', KEYS[1]);" + "end;" + // 计算超时时间,并将当前线程放入超时集合和等待队列中 // timeout = ttl + 线程等待时间(5*60000ms) + 当前时间戳 "local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4]);" + // zadd redisson_lock_timeout:{fairLock} timeout UUID:threadId "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + // rpush redisson_lock_queue:{fairLock} UUID:theadId "redis.call('rpush', KEYS[2], ARGV[2]);" + "end;" + // 返回 ttl "return ttl;"
至此,整个获取锁的流程结束,从上述lua脚本可以看出,有一个hash结构的锁,一个存放获取锁线程的list队列redisson_lock_queue:{fairLock},一个存放等待队列中线程预计获取锁时间的zset集合redisson_lock_timeout:{fairLock},最终存在redis中的数据为:
Hash结构的锁 fairLock
List结构的队列 redisson_lock_queue:{fairLock}
Zset结构的集合 redisson_lock_timeout:{fairLock}
3.3 获取锁成功
获取锁成功后,后续操作锁的续期(看门狗线程),与非公平锁(RedissonLock) 的操作是一样的
3.4 获取锁失败
如果没有获取到锁的线程,与RedissonLock一致的操作,也是在redis中发布订阅消息,等待释放锁的线程发布通知,不过此时订阅的通道有所不同,RedissonFairLock 公平获取锁,即先到先得。所以每个等待线程订阅的channel时不同的:redisson_lock__channel:{fairLock}:UUID:threadId。当某个线程释放锁的时候,只会往等待队列中第一个线程对应订阅的channel发送消息,在释放锁的lua脚本中会向通道中发布消息
protected RFuturesubscribe(long threadId) { // 订阅的通道:redisson_lock__channel:{fairLock}:UUID:threadId return pubSub.subscribe(getEntryName() + ":" + threadId, getChannelName() + ":" + getLockName(threadId)); }
查看一下订阅的通道信息,可使用命令 PUBSUB CHANNELS
事件订阅完成后,同样会进入while (true)死循环中,进行阻塞获取锁
4、释放锁
释放锁是在 RedissonFairLock#unlockInnerAsync 中执行lua脚本,因为加入了公平获取锁的机制,所以lua脚本还是挺长的,依旧是针对每一个分支进行分析
4.1 参数说明
KEYS = Arrays.asList(getRawName(), threadsQueueName, timeoutSetName, getChannelName())
- KEYS[1]:getRawName(),就是key的名称,也就是获取锁对象时设置的"fairLock"
- KEYS[2]:threadsQueueName,获取锁等待队列名称,redisson_lock_queue:{fairLock}
- KEYS[3]:timeoutSetName,等待队列中线程预计获取锁时间的 set 集合,按照时间戳存放,redisson_lock_timeout:{fairLock}
- KEYS[4]:getChannelName(),当前锁对应的订阅channel,redisson_lock__channel:{fairLock}:UUID:threadId
ARGV = LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis()
- ARGV[1]:LockPubSub.UNLOCK_MESSAGE,Redis发布事件时的message,为0
- ARGV[2]:internalLockLeaseTime,watchdog的超时时间,30*1000 ms
- ARGV[3]:getLockName(threadId),UUID:ThreadId,UUID来唯一标识一个客户端
- ARGV[4]:System.currentTimeMillis(),当前时间戳
4.2 lua脚本分析
分支一:清理过期的等待线程
和获取锁的第一步一样,开个死循环清理过期的等待线程,避免无效客户端占用等待队列资源
// remove stale threads
// 这个死循环主要作用是删除失效的线程
"while true do " +
// 获取等待队列第一个元素,就是第一个等待线程
// lindex redisson_lock_queue:{fairLock} 0 返回值其实就是 UUID:threadId
"local firstThreadId2 = redis.call('lindex', KEYS[2], 0);" +
// 如果为空,则跳出循环
"if firstThreadId2 == false then " +
"break;" +
"end;" +
// 获取第一个线程的分值(也就是超时时间) zscore redisson_lock_timeout:{fairLock} UUID:threadId
"local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));" +
// 如果小于当期时间,说明已经失效,首先从超时集合中移除该节点,接着从等待队列中弹出第一个节点,否则退出循环
"if timeout <= tonumber(ARGV[4]) then " +
// remove the item from the queue and timeout set NOTE we do not alter any other timeout
// zrem redisson_lock_timeout:{fairLock} UUID:threadId
"redis.call('zrem', KEYS[3], firstThreadId2);" +
// lpop redisson_lock_queue:{fairLock}
"redis.call('lpop', KEYS[2]);" +
"else " +
"break;" +
"end;" +
"end;" +
分支二:锁已经被释放,通知等待队列中第一个线程
// 判断锁是否存在,exists fairLock
"if (redis.call('exists', KEYS[1]) == 0) then " +
// 如果不存在,则获取等待队列中的第一个元素,lindex redisson_lock_queue:{fairLock} 0
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
// 如果等待列表中第一个元素不为空,即还存在等待线程,往等待线程的订阅channel发送消息,通知其可以尝试获取锁了
// publish redisson_lock__channel:{fairLock}:UUID:threadId 0
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " +
"end;" +
场景:
-
成功获取锁线程重复调用释放锁的方法,第二次释放时,锁已不存在,就去通知等待队列中的第一个元素
-
又或者一个极端场景:当前线程未能成功获取锁,但是调用了释放锁的方法,并且刚好此时锁被释放
分支三:加锁记录中的线程不是当前线程
// 判断加锁记录集合中,是否存在当前客户端当前线程
// hexists fairLock UUID:threadId
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
// 加锁记录不存在当前线程,返回nil
"return nil;" +
"end; " +
场景:
-
当前线程未能成功获取锁,但是调用了释放锁的方法
分支四:当前线程拥有锁,并且获取锁次数大于1
// 利用 hincrby 扣减当前线程的加锁次数
// hincrby fairLock UUID:threadId -1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 如果扣减后次数还是大于0,证明是重入锁,所以此时只需要重新刷新锁的过期时间
"if (counter > 0) then " +
// expire fairLock 30000
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"end; " +
分支五:当前线程成功释放锁
// 利用 del 命令删除锁对应 redis key,del fairLock
"redis.call('del', KEYS[1]); " +
// 获取等待队列中的第一个元素,lindex redisson_lock_queue:{fairLock} 0
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
// 如果不为null,则发布个事件 redisson_lock__channel:{anyLock}:UUID:threadId
"if nextThreadId ~= false then " +
// publish redisson_lock__channel:{fairLock}:UUID:threadId 0
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; "
至此,如果释放锁成功,则后续的操作:例如停止watchdog运行(锁的续期),事件发布订阅操作等,都跟RedissonLock的操作保持一致