Redisson实现分布式锁


之前一篇介绍了使用setnx命令实现分布式锁,但是使用这种方式不是那么严谨,需要我们自行做一些额外操作(setnx + lua方式)来保证锁的健壮性。
redisson为此就做了一些封装,使得我们使用分布式锁时应用就可以简单许多。

1、Maven依赖

server:
  port: 30800

spring:
  redis:
    cluster:
      nodes:
        - 148.70.153.63:9426
        - 148.70.153.63:9427
        - 148.70.153.63:9428
        - 148.70.153.63:9429
        - 148.70.153.63:9430
        - 148.70.153.63:9431
    password: password
    timeout: 2000

这里沿用之前使用RedisTemplate时的配置方式。

2.2、构建RedissonClient

集群Cluster模式下配置

@Bean
public RedissonClient singleRedissonClient() {
    Config config = new Config();
    config.useSingleServer().setAddress("redis://ip:port")
            .setPassword("password")
            .setDatabase(0);
    RedissonClient redissonClient = Redisson.create(config);
    return redissonClient;
}

哨兵模式Sentinel

@RunWith(SpringRunner.class)
@SpringBootTest
@Slf4j
public class RedissonApplicationTests {
    @Autowired
    private RedissonClient redissonClient;

    @Test
    public void testDisLock() {
        // 5个线程并发去获取锁
        IntStream.range(0, 5).parallel().forEach(i -> tryLock());
    }

    @SneakyThrows
    private void tryLock() {
        RLock disLock = redissonClient.getLock("disLock");
        // 获取锁最多等待500ms,10s后key过期自动释放锁
        boolean tryLock = disLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
        if (tryLock) {
            // 获取到锁后开始执行对资源的操作
            try {
                log.info("当前线程:[{}]获得锁", Thread.currentThread().getName());
                // 操作资源...
            } finally {
                disLock.unlock();
            }
        } else {
            log.info("当前线程:[{}]没有获得锁", Thread.currentThread().getName());
        }
    }
}
  1. 获取RLock同时指定key。
  2. 尝试获取锁,同时指定获取锁的最大阻塞时间、锁过期时间。
  3. 获得锁的线程进行资源操作。
  4. 最后一定要释放锁。

结果

127.0.0.1:9426> hgetall disLock
af0cc1b2-7896-4eb4-ba2b-efe5bbcb403a:53
1

第一个元素:uuid:线程id。
第二个元素:当前线程持有锁的次数,即重入的次数。

3.2、watch dog看门狗机制

如果使用锁自动过期方式,假设客户端在拿到锁之后执行的业务时间比较长,在此期间锁被释放,其它线程依旧可以获取到锁,redisson提供一种watch dog看门狗的机制来解决这个问题。

@Test
@SneakyThrows
public void testTryLockAgain() {
    RLock disLock = redissonClient.getLock("disLock");
    // 获取锁最多等待500ms,10s后key过期自动释放锁
    boolean tryLock = disLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
    if (tryLock) {
        try {
            log.info("当前线程:[{}]获得锁,持有锁次数:[{}]", Thread.currentThread().getName(), disLock.getHoldCount());
            // 操作资源...

            // 测试可重入,锁过期时间会重新计时
            boolean tryLockAgain = disLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
            log.info("当前线程:[{}]是否再次拿到锁:[{}],持有锁次数:[{}]", Thread.currentThread().getName(), tryLockAgain, disLock.getHoldCount());
            // 再次操作资源...
        } finally {
            disLock.unlock();
            log.info("当前线程是否持有锁:[{}],持有锁次数:[{}]", disLock.isHeldByCurrentThread(), disLock.getHoldCount());
        }
    } else {
        log.info("当前线程:[{}]没有获得锁", Thread.currentThread().getName());
    }
}

结果

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 如果分布式锁存在,但是value不匹配,表示锁已经被占用,那么直接返回
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 如果就是当前线程占有分布式锁,那么将重入次数减1
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            // 重入次数减1后的值如果大于0,表示分布式锁有重入过,那么只设置失效时间,还不能删除
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                // 重入次数减1后的值如果为0,表示分布式锁只获取过1次,那么删除这个KEY,并发布解锁消息
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

改进

  1. 每次重入锁后都执行一次释放锁的操作。
  2. 或者通过forceUnlock()函数强制释放当前线程持有的锁,只需要在最后释放一次即可。
finally {
    disLock.forceUnlock();
    log.info("当前线程是否持有锁:[{}],持有锁次数:[{}]", disLock.isHeldByCurrentThread(), disLock.getHoldCount());
}

5、总结

之前使用setnx命令实现分布式锁会有一些问题,比如不可重入、非阻塞、误解别的线程的锁、未执行完锁就失效、主从切换锁丢失;其中一些问题我们可以增加代码来解决,但是同样会增加业务代码的复杂度;

redisson则支持锁的可重入和等待获取锁,并在解锁时判断是否是当前线程持有的锁,以及有看门狗机制防止锁过期程序还未执行完的问题,对于这些功能redisson已经做好了封装,简化了业务代码。

但是依旧会有1个问题,主从切换导致的锁丢失,场景如下:

  • 在Redis的master节点上拿到了锁;
  • 但是这个加锁的key还没有同步到slave节点;
  • master故障,发生故障转移,slave节点升级为master节点;
  • 导致锁丢失。

对于这个问题就可以使用Redlock机制来解决,接下来的文章会介绍到Redlock
Redlock实现分布式锁

参考链接

  • Redisson实现Redis分布式锁的N种姿势
  • Redisson 分布式锁实战与 watch dog 机制解读

代码地址

  • github:https://github.com/senlinmu1008/redis-action/tree/master/redisson
  • gitee:https://gitee.com/ppbin/redis-action/tree/master/redisson
    11人点赞   Redis    



作者:砒霜拌辣椒
链接:https://www.jianshu.com/p/59ffff18e1ff
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。