深入理解Redis高并发分布式锁
完善高并发场景下的锁竞争
在分布式系统中,资源竞争问题比单体应用复杂得多。当多个服务实例同时操作共享资源时,如何保证数据一致性就成了一个关键挑战。本文将结合代码实例,梳理 Redis 分布式锁从简单实现到完善方案的演进过程。
1. 单体应用的解决方案:同步锁
在单体应用中,我们可以简单地使用synchronized
或Lock
等同步机制解决资源竞争问题:
// 单体应用中的同步方法
public synchronized String deductStock() {// 库存扣减逻辑
}
但这种方案只适用于单体项目实例,在分布式集群环境下完全失效。
2. 分布式锁的初步实现:Redis 基础加锁
当应用部署在多台服务器上时,我们需要一个跨 JVM 的锁机制,Redis 的setIfAbsent
方法(即SETNX
命令)为我们提供了这种能力:
// 基本的Redis分布式锁实现
public String deductStock() {String lockKey = "lock:product_101";// 尝试获取锁,setIfAbsent等价于SETNX命令Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge");if (!result) {return "error_code"; // 获取锁失败}try {// 库存扣减业务逻辑int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + "");System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} finally {// 释放锁stringRedisTemplate.delete(lockKey);}return "end";
}
核心改进:使用 Redis 的setIfAbsent
实现分布式锁,确保同一时间只有一个服务实例能执行临界区代码。
3. 解决异常导致的锁无法释放问题
如果业务逻辑执行过程中抛出异常,可能导致delete
方法无法执行,造成锁永久占用。解决方案是将释放锁的操作放在finally
块中:
try {// 业务逻辑
} finally {// 确保锁一定会被释放stringRedisTemplate.delete(lockKey);
}
核心改进:通过finally
块保证锁的释放,防止异常导致的死锁。
4. 解决系统宕机导致的锁无法释放
如果在执行finally
块之前系统宕机,锁依然会永久存在。解决办法是给锁设置过期时间:
// 设置锁并指定过期时间
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge");
// 设置过期时间
stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
但上面的代码存在原子性问题,设置锁和设置过期时间是两个操作。可以合并为一个原子操作:
// 原子操作:获取锁并设置过期时间
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "zhuge", 30, TimeUnit.SECONDS);
核心改进:为锁设置过期时间,并通过原子操作确保锁的获取和过期时间设置的一致性。
5. 解决锁误释放问题
上述方案存在一个隐患:如果一个线程的业务执行时间过长,锁过期自动释放后,该线程可能会释放其他线程持有的锁。解决方案是给每个线程分配唯一标识,并在释放锁时验证:
// 生成唯一客户端ID
String clientId = UUID.randomUUID().toString();
// 使用客户端ID作为锁的值
Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);// 释放锁时验证
finally {if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {stringRedisTemplate.delete(lockKey);}
}
核心改进:通过唯一标识确保线程只能释放自己持有的锁,防止误释放。
6. 解决锁验证与释放的原子性问题
即使使用了唯一标识,在get
和delete
操作之间仍可能出现时间窗口:判断锁是自己的之后,锁过期被其他线程获取,这时删除的还是其他线程的锁。
解决这个问题需要使用 Lua 脚本保证验证和删除的原子性,但这增加了实现复杂度。
Lua脚本是单线程执行的,无论脚本中包含多少redis指令,都会一次性执行,其他客户端指令无法插队。这使得lua脚本天然就具备了原子性
7. 解决业务超时导致的锁提前释放
如果业务执行时间超过锁的过期时间,锁会提前释放,导致并发问题。理想的解决方案是 "锁续命" 机制:
- 启动一个后台线程,定期检查主线程是否还在执行
- 如果仍在执行,自动延长锁的过期时间
- 当主线程执行完毕,终止后台线程
这一机制实现复杂,需要处理线程协作、异常情况等。
8. 成熟解决方案:Redisson 框架
Redisson 是一个成熟的 Redis 客户端,内置了分布式锁的完整实现,包括:
- 自动过期时间设置
- 锁续命机制(Watch Dog)
- 原子性的锁操作
- 多种锁类型支持(可重入锁、公平锁等)
使用 Redisson 的最终实现:
public String deductStock() {String lockKey = "lock:product_101";// 获取锁对象RLock redissonLock = redisson.getLock(lockKey);// 加分布式锁redissonLock.lock(); // 内部自动处理过期和续命try {// 库存扣减业务逻辑int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));if (stock > 0) {int realStock = stock - 1;stringRedisTemplate.opsForValue().set("stock", realStock + "");System.out.println("扣减成功,剩余库存:" + realStock);} else {System.out.println("扣减失败,库存不足");}} finally {// 解锁redissonLock.unlock();}return "end";
}
核心改进:通过成熟框架解决分布式锁的各种边界问题,简化开发并提高可靠性。
总结
分布式锁的演进过程体现了软件设计中 "逐步完善" 的思想:
- 从单体到分布式,问题复杂度显著提升
- 每个解决方案可能引入新的问题
- 简单场景可以使用基础实现,复杂场景应选择成熟框架
- 分布式系统中,原子性、超时处理、异常情况都是需要重点考虑的因素
在实际开发中,除非有特殊需求,否则建议直接使用 Redisson 等成熟框架的分布式锁实现,避免重复造轮子和潜在的 bug。
Redisson 的底层是如何实现的呢
开始加锁
redissonLock.lock(); --》lockInterruptibly --》tryAcquire--》tryAcquireAsync
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {if (leaseTime != -1) {return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);}RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);ttlRemainingFuture.addListener(new FutureListener<Long>() {@Overridepublic void operationComplete(Future<Long> future) throws Exception {if (!future.isSuccess()) {return;}Long ttlRemaining = future.getNow();// lock acquiredif (ttlRemaining == null) {scheduleExpirationRenewal(threadId);}}});return ttlRemainingFuture;
}
//手动指定释放锁
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {internalLockLeaseTime = unit.toMillis(leaseTime);return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,"if (redis.call('exists', KEYS[1]) == 0) then " +"redis.call('hset', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return nil; " +"end; " +"return redis.call('pttl', KEYS[1]);",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
//不指定锁释放时间,默认的看门口实现
private void scheduleExpirationRenewal(final long threadId) {if (expirationRenewalMap.containsKey(getEntryName())) {return;}Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {@Overridepublic void run(Timeout timeout) throws Exception {RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +"redis.call('pexpire', KEYS[1], ARGV[1]); " +"return 1; " +"end; " +"return 0;",Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));future.addListener(new FutureListener<Boolean>() {@Overridepublic void operationComplete(Future<Boolean> future) throws Exception {expirationRenewalMap.remove(getEntryName());if (!future.isSuccess()) {log.error("Can't update lock " + getName() + " expiration", future.cause());return;}if (future.getNow()) {// reschedule itselfscheduleExpirationRenewal(threadId);}}});}}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {task.cancel();}
}
我们来看看tryLockInnerAsync方法内的lua脚本做的三个判断:
- 不存在这个键,加锁成功,并设置有效期为
internalLockLeaseTime
- 键已存在,并且是当前线程加的锁(重入锁逻辑),则重入次数加1,并重置超时时间
- 获取锁失败,锁被其他线程持有。返回锁剩余的有效时间
涉及到的关键参数
- KEYS [1]:锁的名称(通过 getName () 获取)
- ARGV [1]:锁的过期时间(毫秒)
- ARGV [2]:线程唯一标识(通过 getLockName (threadId) 生成)
在实际业务中,绝大部分时候都不会走重入锁的逻辑。除非加锁的业务方法发生了递归调用。
Redission获取锁出来默认的无参方法lock(),还包含有参方法,可以指定释放锁的时间如public void lock(long leaseTime, TimeUnit unit)
tryAcquireAsync方法中,采用无参方法加锁的话,leaseTime值默认为-1。根据加锁方法不同,选择不同的续命逻辑。
scheduleExpirationRenewal的lua脚本作用也是重置时间,不过会延迟三分之一的时间(internalLockLeaseTime / 3
)
获取锁成功或失败如何
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {long threadId = Thread.currentThread().getId();Long ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {return;}RFuture<RedissonLockEntry> future = subscribe(threadId);commandExecutor.syncSubscription(future);try {while (true) {ttl = tryAcquire(leaseTime, unit, threadId);// lock acquiredif (ttl == null) {break;}// waiting for messageif (ttl >= 0) {getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);} else {getEntry(threadId).getLatch().acquire();}}} finally {unsubscribe(future, threadId);}
// get(lockAsync(leaseTime, unit));
}
- 锁获取成功的话,ttl为null,直接返回继续执行主线程业务。主线程没执行完成之前,子线程会一直检查为其续命。
- 锁获取失败的话,返回剩余的存活时间,通过
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
获取信号量Semaphore
,并阻塞(时间为ttl)。阻塞时间到了,会再次去尝试获取锁。
获取锁失败的线程会通过redis的发布订阅机制,订阅一个共同的channel。若主线程执行完毕,删除锁的同时,会通过触发信号量提前及时地唤醒等待的线程,而不需要等待ttl之后了。
获取锁失败后是自我阻塞的,因此不会占用系统的cpu资源,也就是此图中为什么说是间隙性加锁的原因所在啦