Redisson加锁和解锁、WatchDog机制的原理


Redisson加锁和解锁、WatchDog机制的原理

一、加锁

默认加锁方法:RLock#lock()

redisson通过lua脚本来保证加锁的原子性,用客户端对应的线程的唯一标识来保证加锁的用户不被抢占,用过期时间和WatchDog机制(可选)保证不死锁。

private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    // 尝试加锁,并返回锁的剩余时间
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        // 返回null,表示加锁成功
        return;
    }

    // 加锁失败,订阅redisson_lock__channel:{锁名称}频道来获取锁的释放消息
    // protected RFuture subscribe(long threadId) {
    //    return pubSub.subscribe(getEntryName(), getChannelName());
    // }
    RFuture future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        // 收到锁的释放消息,自旋获取锁
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                try {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    getEntry(threadId).getLatch().acquire();
                } else {
                    getEntry(threadId).getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        // 无论是否获取锁成功,都取消订阅
        unsubscribe(future, threadId);
    }
}

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

private  RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
    	// 如果客户端自定义过期时间,则不适用WatchDog机制
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // WatchDog机制看下面分析
    RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                          "return nil; " +
                                          "end; " +
                                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                          "return nil; " +
                                          "end; " +
                                          "return redis.call('pttl', KEYS[1]);",
                                          Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

加锁对应的lua脚本

加锁成功返回null,失败返回锁的剩余时间。

--[[ 
参数:
KEYS[1] 锁名称
ARGV[1] 锁的过期时间
ARGV[2] 对应锁的唯一标识
--]]

if (redis.call('exists', KEYS[1]) == 0) then
	-- 如果锁不存在,则设置锁的重入次数为1和过期时间,返回null
    redis.call('hset', KEYS[1], ARGV[2], 1);
	redis.call('pexpire', KEYS[1], ARGV[1]);
	return nil;
end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then 
	-- 如果锁存在,且客户端的唯一标识与当前锁的唯一标识相同,则增加锁的重入次数和设置过期时间,返回null
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
-- 获取锁失败,返回锁的剩余时间
return redis.call('pttl', KEYS[1]);

二、WatchDog机制

WatchDog机制的关键源码在scheduleExpirationRenewal方法里。

WatchDog机制就是在后台开启一个定时任务(默认每次10秒一次),去判断当前客户端是否持有锁,如果是就给锁续期。

WatchDog源码

private void scheduleExpirationRenewal(long threadId) {
    // ExpirationEntry是一个key为线程id,value为Integer的map
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }

    // 在客户端对应的连接上开启一个定时任务,每隔 internalLockLeaseTime / 3 秒就续期锁
    // internalLockLeaseTime == lockWatchdogTimeout == 30s,默认每隔10秒续期一次锁
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            RFuture future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }

                // reschedule itself
                renewExpiration();
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    // 设置ExpirationEntry的超时时间,当释放锁时,会撤销WatchDog任务后再移除ExpirationEntry
    ee.setTimeout(task);
}

protected RFuture renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                          "return 1; " +
                                          "end; " +
                                          "return 0;",
                                          Collections.singletonList(getName()), 
                                          internalLockLeaseTime, getLockName(threadId));
}

续期锁对应的lua脚本

续期成功返回1,否则返回0。

--[[
参数:
KEYS[1] 锁名称
ARGV[1] 锁的过期时间
ARGV[2] 对应锁的唯一标识
--]]

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
	-- 如果当前客户端持有锁,则重新设置锁的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]); " 
	return 1;
end;
return 0;

三、解锁

解锁方法:RLock#unlock

public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

public RFuture unlockAsync(long threadId) {
    RPromise result = new RedissonPromise();
    // 释放锁
    RFuture future = unlockInnerAsync(threadId);

    // 开启异步任务去撤销此锁上的定时任务
    future.onComplete((opStatus, e) -> {
        if (e != null) {
            cancelExpirationRenewal(threadId);
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                                                                                  + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        cancelExpirationRenewal(threadId);
        result.trySuccess(null);
    });

    return result;
}

protected RFuture unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                                          "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                                          "return nil;" +
                                          "end; " +
                                          "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                                          "if (counter > 0) then " +
                                          "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                                          "return 0; " +
                                          "else " +
                                          "redis.call('del', KEYS[1]); " +
                                          "redis.call('publish', KEYS[2], ARGV[1]); " +
                                          "return 1; "+
                                          "end; " +
                                          "return nil;",
                                          Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

}

void cancelExpirationRenewal(Long threadId) {
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }

    if (threadId != null) {
        // 移除在此ExpirationEntry等待的线程
        task.removeThreadId(threadId);
    }

    if (threadId == null || task.hasNoThreads()) {
        // 撤销在此锁上的定时任务
        task.getTimeout().cancel();
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

解锁对应的lua脚本

锁不存在,直接返回null;锁释放,返回0,否则返回1。

--[[
参数:
KEYS[1] 锁名称
KEYS[2] 锁释放事件的频道名称
ARGV[1] 锁释放消息的标志
ARGV[2] 锁的过期时间
ARGV[3] 对应锁的唯一标识
--]]

if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    -- 如果锁不存在,则返回null
	return nil;
end;
-- 锁存在,则锁的重入次数-1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
    -- 客户端重入次数大于0,表示还持有锁,则重新设置过期时间,返回0
	redis.call('pexpire', KEYS[1], ARGV[2]);
	return 0;
else
    -- 客户端重入次数等于于0,表示释放锁,则删除锁对应的key并推送锁释放事件,返回1
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
return nil;