Redisson分布式锁之加解锁详解

ID:980 / 打印

积累知识,胜过积蓄金银!毕竟在数据库开发的过程中,会遇到各种各样的问题,往往都是一些细节知识点还没有掌握好而导致的,因此基础知识点的积累是很重要的。下面本文《Redisson分布式锁之加解锁详解》,就带大家讲解一下锁、Redisson分布式、加解锁知识点,若是你对本文感兴趣,或者是想搞懂其中某个知识点,就请你继续往下看吧~

引言

2023的金三银四来的没想象中那么激烈,一个朋友前段时间投了几十家,多数石沉大海,好不容易等来面试机会,就恰好被问道项目中关于分布式锁的应用,后涉及Redisson实现分布式锁的原理,答不上来。

锁的可重入性

我们都知道,Java中synchronized和lock都支持可重入,synchronized的锁关联一个线程持有者和一个计数器。当一个线程请求成功后,JVM会记下持有锁的线程,并将计数器计为1。此时其他线程请求该锁,则必须等待。而该持有锁的线程如果再次请求这个锁,就可以再次拿到这个锁,同时计数器会递增。当线程退出一个synchronized方法/块时,计数器会递减,如果计数器为0则释放该锁;在ReentrantLock中,底层的 AQS 对应的state 同步状态值表示线程获取该锁的可重入次数,通过CAS方式进行设置,在默认情况下,state的值为0 表示当前锁没有被任何线程持有,原理类似。所以如果想要实现可重入性,可能须有一个计数器来控制重入次数,实际Redisson确实是这么做的。

好的我们通过Redisson客户端进行设置,并循环3次,模拟锁重入:000

for(int i = 0; i  

连接Redis客户端进行查看:

可以看到,我们设置的分布式锁是存在一个hash结构中,value看起来是循环的次数3,key就不怎么认识了,那这个key是怎么设置进去的呢,另外为什么要设置成为Hash类型呢?

加锁

我们先来看看普通的分布式锁的上锁流程:

说明:

  • 客户端在进行加锁时,会校验如果业务上没有设置持有锁时长leaseTime,会启动看门狗来每隔10s进行续命,否则就直接以leaseTime作为持有的时长;
  • 并发场景下,如果客户端1锁还未释放,客户端2尝试获取,加锁必然失败,然后会通过发布订阅模式来订阅Key的释放通知,并继续进入后续的抢锁流程。
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {       long time = unit.toMillis(waitTime);       long current = System.currentTimeMillis();       long threadId = Thread.currentThread().getId();       Long ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);       if (ttl == null) {          return true;       } else {          // 订阅分布式Key对应的消息,监听其它锁持有者释放,锁没有释放的时候则会等待,直到锁释放的时候会执行下面的while循环          CompletableFuture subscribeFuture = this.subscribe(threadId);          subscribeFuture.get(time, TimeUnit.MILLISECONDS);          try {             do {                // 尝试获取锁                ttl = this.tryAcquire(waitTime, leaseTime, unit, threadId);                // 竞争获取锁成功,退出循环,不再竞争。                if (ttl == null) {                   return true;                }                // 利用信号量机制阻塞当前线程相应时间,之后再重新获取锁                if (ttl >= 0L && ttl  0L);          } finally {             // 竞争锁成功后,取消订阅该线程Id事件             this.unsubscribe((RedissonLockEntry)this.commandExecutor.getNow(subscribeFuture), threadId);          }       }    } } 
RFuture tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {         // 如果设置了持有锁的时长,直接进行尝试加锁操作          if (leaseTime != -1L) {             return this.tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);         } else {             // 未设置加锁时长,在加锁成功后,启动续期任务,初始默认持有锁时间是30s             RFuture ttlRemainingFuture = this.tryLockInnerAsync(this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);             ttlRemainingFuture.addListener(new FutureListener() {                 public void operationComplete(Future future) throws Exception {                     if (future.isSuccess()) {                         Long ttlRemaining = (Long)future.getNow();                         if (ttlRemaining == null) {                             RedissonLock.this.scheduleExpirationRenewal(threadId);                         }                     }                 }             });             return ttlRemainingFuture;         }     } 

我们都知道Redis执行Lua脚本具有原子性,所以在尝试加锁的下层,Redis主要执行了一段复杂的lua脚本:

-- 不存在该key时 if (redis.call('exists', KEYS[1]) == 0) then       -- 新增该锁并且hash中该线程id对应的count置1 redis.call('hincrby', KEYS[1], ARGV[2], 1); -- 设置过期时间 redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 存在该key 并且 hash中线程id的key也存在 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then       -- 线程重入次数++ redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]); 

参数说明:

KEYS[1]:对应我们设置的分布式key,即:distributed:lock:distribute_key

ARGV[1]:业务自定义的加锁时长或者默认的30s;

ARGV[2]: 具体的客户端初始化连接UUID+线程ID: 9d8f0907-1165-47d2-8983-1e130b07ad0c:1

我们从上面的脚本中可以看出核心逻辑其实不难:

  • 如果分布式锁Key未被任何端持有,直接根据“客户端连接ID+线程ID” 进行初始化设置,并设置重入次数为1,并设置Key的过期时间;
  • 否则重入次数+1,并重置过期时间;

锁续命

接下来看看scheduleExpirationRenewal续命是怎么做的呢?

private void scheduleExpirationRenewal(final long threadId) {    if (!expirationRenewalMap.containsKey(this.getEntryName())) {       Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {          public void run(Timeout timeout) throws Exception {             // 执行续命操作             RFuture future = RedissonLock.this.renewExpirationAsync(threadId);             future.addListener(new FutureListener() {                public void operationComplete(Future future) throws Exception {                   RedissonLock.expirationRenewalMap.remove(RedissonLock.this.getEntryName());                           ...                   // 续命成功,继续                   if ((Boolean)future.getNow()) {                      RedissonLock.this.scheduleExpirationRenewal(threadId);                   }                }             });          }       }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);    } } 

Tip小知识点:

  • 续期是用的什么定时任务执行的?
    Redisson用netty的HashedWheelTimer做命令重试机制,原因在于一条redis命令的执行不论成功或者失败耗时都很短,而HashedWheelTimer是单线程的,系统性能开销小。

而在上面的renewExpirationAsync中续命操作的执行核心Lua脚本要做的事情也非常的简单,就是给这个Key的过期时间重新设置为指定的30s.

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then     redis.call('pexpire', KEYS[1], ARGV[1]);     return 1; end; return 0; 

释放锁

释放锁主要是除了解锁本省,另外还要考虑到如果存在续期的情况,要将续期任务删除:

public RFuture unlockAsync(long threadId) {    // 解锁    RFuture future = this.unlockInnerAsync(threadId);    CompletionStage f = future.handle((opStatus, e) -> {       // 解除续期       this.cancelExpirationRenewal(threadId);       ...    });    return new CompletableFutureWrapper(f); } 

在unlockInnerAsync内部,Redisson释放锁其实核心也是执行了如下一段核心Lua脚本:

    // 校验是否存在     if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then       return nil;       end;     // 获取加锁次数,校验是否为重入锁     local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);     // 如果为重入锁,重置过期时间,锁本身不释放     if (counter > 0) then       redis.call('pexpire', KEYS[1], ARGV[2]);       return 0;    // 删除Key     else redis.call('del', KEYS[1]);       // 通知阻塞的客户端可以抢锁啦       redis.call('publish', KEYS[2], ARGV[1]);       return 1;       end;       return nil; 

其中:

KEYS[1]: 分布式锁
KEYS[2]: redisson_lock_channel:{分布式锁} 发布订阅消息的管道名称
ARGV[1]: 发布的消息内容
ARGV[2]: 锁的过期时间
ARGV[3]: 线程ID标识名称

其它问题

  • 红锁这么火,但真的靠谱么?
  • Redisson公平锁是什么情况?

今天关于《Redisson分布式锁之加解锁详解》的内容介绍就到此结束,如果有什么疑问或者建议,可以在the24.cn下多多回复交流;文中若有不正之处,也希望回复留言以告知!

上一篇: redis++的编译 安装 使用方案
下一篇: 详解如何清理Redis内存碎片

作者:admin @ 24资源网   2024-09-02

本站所有软件、源码、文章均有网友提供,如有侵权联系308410122@qq.com

与本文相关文章

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。