Redisson加锁和解锁、WatchDog机制的原理
Redisson加锁和解锁、WatchDog机制的原理
一、加锁
默认加锁方法:RLock#lock()
redisson通过lua脚本来保证加锁的原子性,用客户端对应的线程的唯一标识来保证加锁的用户不被抢占,用过期时间和WatchDog机制(可选)保证不死锁。
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 尝试加锁,并返回锁的剩余时间
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
// 返回null,表示加锁成功
return;
}
// 加锁失败,订阅redisson_lock__channel:{锁名称}频道来获取锁的释放消息
// protected RFuture subscribe(long threadId) {
// return pubSub.subscribe(getEntryName(), getChannelName());
// }
RFuture future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
// 收到锁的释放消息,自旋获取锁
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
try {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
getEntry(threadId).getLatch().acquire();
} else {
getEntry(threadId).getLatch().acquireUninterruptibly();
}
}
}
} finally {
// 无论是否获取锁成功,都取消订阅
unsubscribe(future, threadId);
}
}
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));
}
private RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
// 如果客户端自定义过期时间,则不适用WatchDog机制
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
// WatchDog机制看下面分析
RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.
加锁对应的lua脚本
加锁成功返回null,失败返回锁的剩余时间。
--[[
参数:
KEYS[1] 锁名称
ARGV[1] 锁的过期时间
ARGV[2] 对应锁的唯一标识
--]]
if (redis.call('exists', KEYS[1]) == 0) then
-- 如果锁不存在,则设置锁的重入次数为1和过期时间,返回null
redis.call('hset', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 如果锁存在,且客户端的唯一标识与当前锁的唯一标识相同,则增加锁的重入次数和设置过期时间,返回null
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;
-- 获取锁失败,返回锁的剩余时间
return redis.call('pttl', KEYS[1]);
二、WatchDog机制
WatchDog机制的关键源码在
scheduleExpirationRenewal
方法里。WatchDog机制就是在后台开启一个定时任务(默认每次10秒一次),去判断当前客户端是否持有锁,如果是就给锁续期。
private void scheduleExpirationRenewal(long threadId) {
// ExpirationEntry是一个key为线程id,value为Integer的map
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
// 在客户端对应的连接上开启一个定时任务,每隔 internalLockLeaseTime / 3 秒就续期锁
// internalLockLeaseTime == lockWatchdogTimeout == 30s,默认每隔10秒续期一次锁
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
// reschedule itself
renewExpiration();
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
// 设置ExpirationEntry的超时时间,当释放锁时,会撤销WatchDog任务后再移除ExpirationEntry
ee.setTimeout(task);
}
protected RFuture renewExpirationAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.
续期锁对应的lua脚本
续期成功返回1,否则返回0。
--[[
参数:
KEYS[1] 锁名称
ARGV[1] 锁的过期时间
ARGV[2] 对应锁的唯一标识
--]]
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 如果当前客户端持有锁,则重新设置锁的过期时间
redis.call('pexpire', KEYS[1], ARGV[1]); "
return 1;
end;
return 0;
三、解锁
解锁方法:RLock#unlock
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
public RFuture unlockAsync(long threadId) {
RPromise result = new RedissonPromise();
// 释放锁
RFuture future = unlockInnerAsync(threadId);
// 开启异步任务去撤销此锁上的定时任务
future.onComplete((opStatus, e) -> {
if (e != null) {
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
protected RFuture unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.
解锁对应的lua脚本
锁不存在,直接返回null;锁释放,返回0,否则返回1。
--[[
参数:
KEYS[1] 锁名称
KEYS[2] 锁释放事件的频道名称
ARGV[1] 锁释放消息的标志
ARGV[2] 锁的过期时间
ARGV[3] 对应锁的唯一标识
--]]
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
-- 如果锁不存在,则返回null
return nil;
end;
-- 锁存在,则锁的重入次数-1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 客户端重入次数大于0,表示还持有锁,则重新设置过期时间,返回0
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 客户端重入次数等于于0,表示释放锁,则删除锁对应的key并推送锁释放事件,返回1
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;