Redisson分布式锁解析


目录
  • 分布式锁
    • 分布式锁设计需解决的问题
  • Redis
    • 如何获取锁?
    • 持有锁之后,如何保证其他线程不会获取锁或者释放锁?
    • 如何释放锁?
    • 释放锁时出现异常,如何保证不死锁?
    • 获取不到锁时,应该做些什么才能等锁释放之后,尽快获取锁?
    • 如何保证以上操作的线程安全问题,例如获取锁的时候同时设置锁超时时间,释放锁时释放的自己持有的锁?
  • Redisson
    • 环境
    • 使用
    • 获取锁
      • tryLock
      • tryAcquire - tryAcquireAsync - tryLockInnerAsync
    • 释放锁
      • unlock
      • unlockInnerAsync
    • 小结
  • 问题
  • 参考

分布式锁

关于分布式锁的解决方案,常用的有Zookeeper、Redis等。这些中间件的都有共同一个特点,有一定能力保证一致性

  • Zookeeper:集群对外只有Leader节点提供服务,通过ZAB协议保证LeaderFollower一致
  • Redis:文件事件分派器队列的单线程消费。单机Redis可以保证一致,集群Redis不能保证强一致

CAP:C(一致性),A(可用性),P(分区容错)

  • redis单机:CP
  • redis集群:AP
    • redis是高并发性,采用异步通知的方式,当主机宕机时会发现锁丢失,比如:主节点没来的及把刚刚set进来这条数据给从节点,master就挂了,从机上位但从机上无该数据。可从代码层面解决。
  • zookeeper集群:CP
    • zookeeper是高一致性,当所有zk服务器都收到消息后,整个过程才算完成。

分布式锁设计需解决的问题

解决问题的前提是定义清楚问题

  • 如何获取锁?
  • 持有锁之后,如何保证其他人不会获取锁或者释放锁?
  • 如何释放锁?
  • 释放锁时出现异常,如何保证不死锁?
  • 获取不到锁时,应该做些什么才能等锁释放的时候,尽快获取锁?
  • 如何保证以上操作的线程安全问题,例如获取锁的时候同时设置锁超时时间?

Redis

如何获取锁?

  • 利用Redis单线程特性
  • 使用setNX(set if not exists),类似putIfAbsent。不存在就set,返回1;已存在就不set,返回0
setNX key value

持有锁之后,如何保证其他线程不会获取锁或者释放锁?

  • 为锁加秘钥
  • 使用hsetNX(hash set if not exists)
hsetNX key secretKey value

如何释放锁?

  • 利用Redis单线程特性
  • 使用del删除
del key

释放锁时出现异常,如何保证不死锁?

  • 解决死锁的办法常规有1. 超时释放 2. 死锁检测 (3.重启大法...)
  • hsetNX设置锁后,马上使用pexpire设置超时时间,组合成原子指令
hsetNX key secretKey value
pexpire key milliseconds

获取不到锁时,应该做些什么才能等锁释放之后,尽快获取锁?

  • 订阅锁释放信号,异步通知
  • 通过SUB订阅消息,待锁释放PUB消息通知其他线程获取锁

如何保证以上操作的线程安全问题,例如获取锁的时候同时设置锁超时时间,释放锁时释放的自己持有的锁?

  • 脚本化执行,打包成原子操作

Redisson

通过Redisson分布式锁的原理来解决以上问题

环境

  • redisson:3.3.2
  • redis:5.0.7 - 哨兵

使用

//根据key得到锁实例
RLock lock = redissonClient.getLock(key);
//尝试加锁
lock.tryLock(1, TimeUnit.SECONDS);
//解锁
lock.unlock();

获取锁

tryLock

  • 6个步骤,用分隔符分开
    • tryAcquire尝试第一次获取锁,返回ttlttlnull则获取锁成功返回true;否则看获取锁是否超时,超时则获取锁失败返回false,未超时继续
    • subscribe订阅锁释放信号
    • 循环tryAcquire尝试获取锁
    • semaphore阻塞等待锁释放信号。锁超时时间 < 等待超时时间,则阻塞时间为锁超时时间 ;否则为等待超时时间。
    • 阻塞放开,再尝试获取锁
    • 跳出循环,取消订阅锁释放信号
//org.redisson.RedissonLock#tryLock(long, long, java.util.concurrent.TimeUnit)
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    //`tryAcquire`尝试第一次获取锁,返回`ttl`。`ttl`为`null`则获取锁成功返回`true`;否则看获取锁是否超时,超时则获取锁失败返回`false`,未超时继续
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    final long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return true;
    }

    time -= (System.currentTimeMillis() - current);
    if (time <= 0) {
        acquireFailed(threadId);
        return false;
    }
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    //`subscribe`订阅锁释放消息
    current = System.currentTimeMillis();
    final RFuture subscribeFuture = subscribe(threadId);
    if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.addListener(new FutureListener() {
                @Override
                public void operationComplete(Future future) throws Exception {
                    if (subscribeFuture.isSuccess()) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                }
            });
        }
        acquireFailed(threadId);
        return false;
    }
    
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    
    try {
        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
		
        while (true) {
            //循环`tryAcquire`尝试获取锁
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                return true;
            }

            time -= (System.currentTimeMillis() - currentTime);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
			
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// 
            
            // waiting for message
            //semaphore阻塞等待锁释放信号。锁超时时间 < 等待超时时间,则阻塞时间为锁超时时间 ;否则为等待超时时间。
            currentTime = System.currentTimeMillis();
            if (ttl >= 0 && ttl < time) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            
            }         
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
            //阻塞放开,再尝试获取锁
            time -= (System.currentTimeMillis() - currentTime);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        }
    } finally {
       ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
        //跳出循环,取消订阅锁释放信号
        unsubscribe(subscribeFuture, threadId);
        
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
    }
    //        return get(tryLockAsync(waitTime, leaseTime, unit));
}

tryAcquire - tryAcquireAsync - tryLockInnerAsync

  • Lua脚本的原子操作
  • 判断key是否存在。不存在,则hset秘钥和value,同时set锁超时时间,返回
  • key存在, 并且秘钥验证通过,则认为是重入, 锁计数+1,刷新锁超时时间,类似于synchronized重入锁,返回
  • key不存在或者秘钥验证不通过,返回锁超时时间
//org.redisson.RedissonLock#tryLockInnerAsync
 RFuture tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand command) {
    internalLockLeaseTime = unit.toMillis(leaseTime);

    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                                          //判断key是否存在。不存在,则hset秘钥和value,同时set锁超时时间, 返回
                                          "if (redis.call('exists', KEYS[1]) == 0) then " +
                                          "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                          "return nil; " +
                                          "end; " +
                                          
                                          //key存在, 并且秘钥验证通过,则认为是重入,锁计数+1,刷新锁超时时间,类似于`synchronized`重入锁,返回
                                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                                          "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                                          "return nil; " +
                                          "end; " +
                                          
                                          //key不存在或者秘钥验证不通过, 则返回锁超时时间
                                          "return redis.call('pttl', KEYS[1]);",
                                          Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

释放锁

unlock

  • unlockInnerAsync获取释放锁状态
  • 停掉锁续期
@Override
public void unlock() {
    Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
    if (opStatus == null) {
        throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                                               + id + " thread-id: " + Thread.currentThread().getId());
    }
    if (opStatus) {
        cancelExpirationRenewal();
    }

    //        Future future = unlockAsync();
    //        future.awaitUninterruptibly();
    //        if (future.isSuccess()) {
    //            return;
    //        }
    //        if (future.cause() instanceof IllegalMonitorStateException) {
    //            throw (IllegalMonitorStateException)future.cause();
    //        }
    //        throw commandExecutor.convertException(future);
}

unlockInnerAsync

  • Lua脚本的原子操作
  • 如果key不存在了, 发布锁释放信号,返回1
  • 如果秘钥不匹配或不存在, 返回nil, 锁不存在或不能释放自己未持有的锁,返回null
  • key存在,且是自己持有锁, 则锁计数-1
    • 最后计数 > 0, 则自己线程还持有锁, 刷新锁超时时间,返回0
    • 最后计数 <= 0, 则此时锁要释放, 发布锁释放信号,返回1
protected RFuture unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                //如果key不存在了, 发布锁释放信号,返回1
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                "end;" +
                                              
                //如果秘钥不匹配或不存在, 返回nil, 锁不存在或不能释放自己未持有的锁
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                                              
                //key存在,且是自己持有锁, 则锁计数-1, 
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                //计数 > 0, 则自己线程还持有锁, 刷新锁超时时间
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                //计数 <= 0, 则此时锁要释放, 发布锁释放信号,返回1
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}

小结

对照前面5个问题,回看redisson分布式锁

  • 如何获取锁?
    • redisson使用hset命令来尝试获取锁
  • 持有锁之后,如何保证其他线程不会获取锁或者释放锁?
    • redisson使用UUID + threadId作为secretKey作为秘钥操作锁
  • 如何释放锁?
    • redisson使用del命令来尝试获取锁
  • 释放锁时出现异常,如何保证不死锁?
    • redisson使用pexpire命令设置锁超时时间
  • 获取不到锁时,应该做些什么才能等锁释放之后,尽快获取锁?
    • redisson使用PUBSUB来做信号通知
  • 如何保证以上操作的线程安全问题,例如获取锁的时候同时设置锁超时时间,释放锁时释放的是自己持有的锁?
    • redisson通过Lua脚本组合命令为原子操作
    • 获取锁的时候同时设置锁超时时间:hset + pexpire原子操作
    • 释放锁时释放的是自己持有的锁:hexists + del先判断再删除原子操作

问题

  • 使用redis来做分布式锁是非严格的,redis并非强一致性,主备切换的时候可能会出现问题,概率较小,但redis的优点主要是性能更好
  • zookeeper是强一致,分布式锁相对而言更好,但性能上不及redis
  • Trade-Off,选择强一致性还是选择性能根据实际业务而定

参考

  • http://kaito-kidd.com/2021/06/08/is-redis-distributed-lock-really-safe/
  • https://blog.csdn.net/tianyaleixiaowu/article/details/96112684
  • https://mp.weixin.qq.com/s/y_Uw3P2Ll7wvk_j5Fdlusw
  • https://zhuanlan.zhihu.com/p/73807097