Redisson分布式锁解析
目录
- 分布式锁
- 分布式锁设计需解决的问题
- Redis
- 如何获取锁?
- 持有锁之后,如何保证其他线程不会获取锁或者释放锁?
- 如何释放锁?
- 释放锁时出现异常,如何保证不死锁?
- 获取不到锁时,应该做些什么才能等锁释放之后,尽快获取锁?
- 如何保证以上操作的线程安全问题,例如获取锁的时候同时设置锁超时时间,释放锁时释放的自己持有的锁?
- Redisson
- 环境
- 使用
- 获取锁
- tryLock
- tryAcquire - tryAcquireAsync - tryLockInnerAsync
- 释放锁
- unlock
- unlockInnerAsync
- 小结
- 问题
- 参考
分布式锁
关于分布式锁的解决方案,常用的有Zookeeper、Redis等。这些中间件的都有共同一个特点,有一定能力保证一致性
- Zookeeper:集群对外只有Leader节点提供服务,通过ZAB协议保证Leader与Follower一致
- Redis:文件事件分派器队列的单线程消费。单机Redis可以保证一致,集群Redis不能保证强一致
CAP:C(一致性),A(可用性),P(分区容错)
- redis单机:CP
- redis集群:AP
- redis是高并发性,采用异步通知的方式,当主机宕机时会发现锁丢失,比如:主节点没来的及把刚刚set进来这条数据给从节点,master就挂了,从机上位但从机上无该数据。可从代码层面解决。
- zookeeper集群:CP
- zookeeper是高一致性,当所有zk服务器都收到消息后,整个过程才算完成。
分布式锁设计需解决的问题
解决问题的前提是定义清楚问题
- 如何获取锁?
- 持有锁之后,如何保证其他人不会获取锁或者释放锁?
- 如何释放锁?
- 释放锁时出现异常,如何保证不死锁?
- 获取不到锁时,应该做些什么才能等锁释放的时候,尽快获取锁?
- 如何保证以上操作的线程安全问题,例如获取锁的时候同时设置锁超时时间?
Redis
如何获取锁?
- 利用Redis单线程特性
- 使用setNX(set if not exists),类似putIfAbsent。不存在就set,返回1;已存在就不set,返回0
setNX key value
持有锁之后,如何保证其他线程不会获取锁或者释放锁?
- 为锁加秘钥
- 使用hsetNX(hash set if not exists)
hsetNX key secretKey value
如何释放锁?
- 利用Redis单线程特性
- 使用del删除
del key
释放锁时出现异常,如何保证不死锁?
- 解决死锁的办法常规有1. 超时释放 2. 死锁检测 (3.重启大法...)
- 在hsetNX设置锁后,马上使用pexpire设置超时时间,组合成原子指令
hsetNX key secretKey value
pexpire key milliseconds
获取不到锁时,应该做些什么才能等锁释放之后,尽快获取锁?
- 订阅锁释放信号,异步通知
- 通过SUB订阅消息,待锁释放PUB消息通知其他线程获取锁
如何保证以上操作的线程安全问题,例如获取锁的时候同时设置锁超时时间,释放锁时释放的自己持有的锁?
- 脚本化执行,打包成原子操作
Redisson
通过Redisson分布式锁的原理来解决以上问题
环境
- redisson:3.3.2
- redis:5.0.7 - 哨兵
使用
//根据key得到锁实例
RLock lock = redissonClient.getLock(key);
//尝试加锁
lock.tryLock(1, TimeUnit.SECONDS);
//解锁
lock.unlock();
获取锁
tryLock
- 6个步骤,用分隔符分开
tryAcquire
尝试第一次获取锁,返回ttl
。ttl
为null
则获取锁成功返回true
;否则看获取锁是否超时,超时则获取锁失败返回false
,未超时继续subscribe
订阅锁释放信号- 循环
tryAcquire
尝试获取锁 semaphore
阻塞等待锁释放信号。锁超时时间 < 等待超时时间,则阻塞时间为锁超时时间 ;否则为等待超时时间。- 阻塞放开,再尝试获取锁
- 跳出循环,取消订阅锁释放信号
//org.redisson.RedissonLock#tryLock(long, long, java.util.concurrent.TimeUnit)
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
//`tryAcquire`尝试第一次获取锁,返回`ttl`。`ttl`为`null`则获取锁成功返回`true`;否则看获取锁是否超时,超时则获取锁失败返回`false`,未超时继续
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
final long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//`subscribe`订阅锁释放消息
current = System.currentTimeMillis();
final RFuture subscribeFuture = subscribe(threadId);
if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.addListener(new FutureListener() {
@Override
public void operationComplete(Future future) throws Exception {
if (subscribeFuture.isSuccess()) {
unsubscribe(subscribeFuture, threadId);
}
}
});
}
acquireFailed(threadId);
return false;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
try {
time -= (System.currentTimeMillis() - current);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
//循环`tryAcquire`尝试获取锁
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// waiting for message
//semaphore阻塞等待锁释放信号。锁超时时间 < 等待超时时间,则阻塞时间为锁超时时间 ;否则为等待超时时间。
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//阻塞放开,再尝试获取锁
time -= (System.currentTimeMillis() - currentTime);
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
//跳出循环,取消订阅锁释放信号
unsubscribe(subscribeFuture, threadId);
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
tryAcquire - tryAcquireAsync - tryLockInnerAsync
- Lua脚本的原子操作
- 判断key是否存在。不存在,则hset秘钥和value,同时set锁超时时间,返回
- key存在, 并且秘钥验证通过,则认为是重入, 锁计数+1,刷新锁超时时间,类似于
synchronized
重入锁,返回 - key不存在或者秘钥验证不通过,返回锁超时时间
//org.redisson.RedissonLock#tryLockInnerAsync
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
//判断key是否存在。不存在,则hset秘钥和value,同时set锁超时时间, 返回
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//key存在, 并且秘钥验证通过,则认为是重入,锁计数+1,刷新锁超时时间,类似于`synchronized`重入锁,返回
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
//key不存在或者秘钥验证不通过, 则返回锁超时时间
"return redis.call('pttl', KEYS[1]);",
Collections.
释放锁
unlock
unlockInnerAsync
获取释放锁状态- 停掉锁续期
@Override
public void unlock() {
Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
if (opStatus == null) {
throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + Thread.currentThread().getId());
}
if (opStatus) {
cancelExpirationRenewal();
}
// Future future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
unlockInnerAsync
- Lua脚本的原子操作
- 如果key不存在了, 发布锁释放信号,返回1
- 如果秘钥不匹配或不存在, 返回nil, 锁不存在或不能释放自己未持有的锁,返回
null
- key存在,且是自己持有锁, 则锁计数-1
- 最后计数 > 0, 则自己线程还持有锁, 刷新锁超时时间,返回0
- 最后计数 <= 0, 则此时锁要释放, 发布锁释放信号,返回1
protected RFuture unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
//如果key不存在了, 发布锁释放信号,返回1
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
//如果秘钥不匹配或不存在, 返回nil, 锁不存在或不能释放自己未持有的锁
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
//key存在,且是自己持有锁, 则锁计数-1,
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
//计数 > 0, 则自己线程还持有锁, 刷新锁超时时间
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
//计数 <= 0, 则此时锁要释放, 发布锁释放信号,返回1
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.
小结
对照前面5个问题,回看redisson分布式锁
- 如何获取锁?
redisson
使用hset
命令来尝试获取锁
- 持有锁之后,如何保证其他线程不会获取锁或者释放锁?
redisson
使用UUID + threadId
作为secretKey
作为秘钥操作锁
- 如何释放锁?
redisson
使用del
命令来尝试获取锁
- 释放锁时出现异常,如何保证不死锁?
redisson
使用pexpire
命令设置锁超时时间
- 获取不到锁时,应该做些什么才能等锁释放之后,尽快获取锁?
redisson
使用PUBSUB
来做信号通知
- 如何保证以上操作的线程安全问题,例如获取锁的时候同时设置锁超时时间,释放锁时释放的是自己持有的锁?
redisson
通过Lua
脚本组合命令为原子操作- 获取锁的时候同时设置锁超时时间:
hset
+pexpire
原子操作 - 释放锁时释放的是自己持有的锁:
hexists
+del
先判断再删除原子操作
问题
- 使用
redis
来做分布式锁是非严格的,redis
并非强一致性,主备切换的时候可能会出现问题,概率较小,但redis
的优点主要是性能更好 zookeeper
是强一致,分布式锁相对而言更好,但性能上不及redis
- Trade-Off,选择强一致性还是选择性能根据实际业务而定
参考
- http://kaito-kidd.com/2021/06/08/is-redis-distributed-lock-really-safe/
- https://blog.csdn.net/tianyaleixiaowu/article/details/96112684
- https://mp.weixin.qq.com/s/y_Uw3P2Ll7wvk_j5Fdlusw
- https://zhuanlan.zhihu.com/p/73807097