# Redis 分布式锁实现

# 前言

在分布式系统中,为了保证数据的一致性和并发控制,我们经常需要使用分布式锁。Redis 由于其高性能、原子性操作等特性,成为实现分布式锁的热门选择。本文将详细介绍 Redis 分布式锁的实现原理、多种实现方案以及最佳实践。

# 1. 分布式锁基础概念

# 1.1 什么是分布式锁

分布式锁是一种在分布式环境下控制多个进程 / 线程对共享资源访问的机制。它与本地锁(如 synchronized、ReentrantLock)的主要区别在于:

特性本地锁分布式锁
作用范围单个 JVM 进程多个 JVM 进程 / 多台机器
实现方式JVM 内置外部存储(Redis、Zookeeper 等)
性能相对较低(网络开销)
可靠性依赖单机依赖外部系统

# 1.2 分布式锁的核心要求

一个合格的分布式锁应该满足以下条件:

  1. 互斥性:在任意时刻,只有一个客户端能持有锁
  2. 避免死锁:即使持有锁的客户端崩溃或网络不可达,也能保证锁最终会被释放
  3. 容错性:只要大部分 Redis 节点正常运行,客户端就能够获取和释放锁
  4. 可重入性:同一个客户端在外层使用锁之后,在进入内层仍然可以获取锁

# 2. Redis 分布式锁的基本实现

# 2.1 最简单的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* 最基础的Redis分布式锁实现
*/
public class SimpleRedisLock {

private RedisTemplate<String, String> redisTemplate;
private String lockKey;
private String lockValue;
private long expireTime = 30; // 默认30秒过期

public SimpleRedisLock(RedisTemplate<String, String> redisTemplate, String lockKey) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.lockValue = UUID.randomUUID().toString();
}

/**
* 获取锁
*/
public boolean tryLock() {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
}

/**
* 释放锁
*/
public boolean unlock() {
// 简单的删除操作,存在误删风险
Boolean result = redisTemplate.delete(lockKey);
return Boolean.TRUE.equals(result);
}
}

问题分析:
这种实现存在严重问题:如果客户端 A 获取锁后,由于某种原因(GC 停顿、网络延迟等)导致锁过期,然后客户端 B 获取了锁,此时客户端 A 恢复执行并调用 unlock (),会错误地释放客户端 B 的锁。

# 2.2 改进版:使用 Lua 脚本保证原子性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
/**
* 改进版Redis分布式锁
*/
public class ImprovedRedisLock {

private RedisTemplate<String, String> redisTemplate;
private String lockKey;
private String lockValue;
private long expireTime = 30;

public ImprovedRedisLock(RedisTemplate<String, String> redisTemplate, String lockKey) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.lockValue = UUID.randomUUID().toString();
}

/**
* 获取锁
*/
public boolean tryLock() {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.SECONDS);
return Boolean.TRUE.equals(result);
}

/**
* 安全释放锁(使用Lua脚本)
*/
public boolean unlock() {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(lockKey), lockValue);

return Long.valueOf(1).equals(result);
}

/**
* 带超时时间的获取锁
*/
public boolean tryLock(long timeout, TimeUnit unit) {
long startTime = System.currentTimeMillis();
long timeoutMillis = unit.toMillis(timeout);

while (System.currentTimeMillis() - startTime < timeoutMillis) {
if (tryLock()) {
return true;
}

try {
Thread.sleep(50); // 休眠50ms重试
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}

return false;
}
}

# 3. RedLock 算法实现

# 3.1 RedLock 算法原理

RedLock 是 Redis 官方提出的分布式锁算法,通过在多个独立的 Redis 实例上获取锁来提高可靠性。算法步骤如下:

  1. 获取当前时间戳
  2. 依次向 N 个 Redis 实例请求获取锁
  3. 计算获取锁消耗的时间
  4. 如果成功获取锁的实例数量 >= N/2 + 1,且耗时 < 锁的有效时间,则认为获取锁成功
  5. 否则,向所有实例发送释放锁的请求

# 3.2 RedLock 实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
/**
* RedLock算法实现
*/
@Component
public class RedLock {

private List<RedisTemplate<String, String>> redisTemplates;
private int retryCount = 3;
private long retryDelay = 200; // 重试间隔200ms

public RedLock(List<RedisTemplate<String, String>> redisTemplates) {
this.redisTemplates = redisTemplates;
}

/**
* 获取分布式锁
*/
public boolean tryLock(String lockKey, String lockValue, long expireTime, TimeUnit unit) {
long startTime = System.currentTimeMillis();
long expireTimeMillis = unit.toMillis(expireTime);
int successCount = 0;

// 向所有Redis实例请求获取锁
for (RedisTemplate<String, String> redisTemplate : redisTemplates) {
if (tryLockSingle(redisTemplate, lockKey, lockValue, expireTime, unit)) {
successCount++;
}
}

long elapsedTime = System.currentTimeMillis() - startTime;
long remainingTime = expireTimeMillis - elapsedTime;

// 检查是否获取成功
if (successCount >= (redisTemplates.size() / 2 + 1) && remainingTime > 0) {
return true;
}

// 获取失败,释放已获取的锁
unlockAll(lockKey, lockValue);
return false;
}

/**
* 在单个Redis实例上获取锁
*/
private boolean tryLockSingle(RedisTemplate<String, String> redisTemplate,
String lockKey, String lockValue,
long expireTime, TimeUnit unit) {
for (int i = 0; i < retryCount; i++) {
try {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, unit);
if (Boolean.TRUE.equals(result)) {
return true;
}
} catch (Exception e) {
// 记录异常,继续尝试下一个实例
log.warn("获取锁异常,Redis实例: {}", redisTemplate, e);
}

try {
Thread.sleep(retryDelay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}

return false;
}

/**
* 释放所有实例上的锁
*/
public void unlockAll(String lockKey, String lockValue) {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);

for (RedisTemplate<String, String> redisTemplate : redisTemplates) {
try {
redisTemplate.execute(redisScript,
Collections.singletonList(lockKey), lockValue);
} catch (Exception e) {
log.warn("释放锁异常,Redis实例: {}", redisTemplate, e);
}
}
}
}

# 4. 可重入分布式锁实现

# 4.1 可重入锁原理

可重入锁允许同一个线程多次获取同一个锁。实现思路:

  1. 使用哈希结构存储锁信息,field 为线程标识,value 为重入次数
  2. 获取锁时,如果锁不存在则创建;如果存在且是同一线程则增加重入次数
  3. 释放锁时,减少重入次数,当次数为 0 时才真正释放锁

# 4.2 可重入锁实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
/**
* 可重入Redis分布式锁
*/
public class ReentrantRedisLock {

private RedisTemplate<String, String> redisTemplate;
private String lockKey;
private String lockValue; // 线程标识
private long expireTime = 30;

public ReentrantRedisLock(RedisTemplate<String, String> redisTemplate, String lockKey) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.lockValue = Thread.currentThread().getId() + ":" + UUID.randomUUID().toString();
}

/**
* 获取可重入锁
*/
public boolean lock() {
String luaScript =
"local lockKey = KEYS[1] " +
"local lockValue = ARGV[1] " +
"local expireTime = ARGV[2] " +
"local currentCount = redis.call('HGET', lockKey, lockValue) " +
"if currentCount then " +
" redis.call('HINCRBY', lockKey, lockValue, 1) " +
" redis.call('EXPIRE', lockKey, expireTime) " +
" return 1 " +
"else " +
" if redis.call('HSETNX', lockKey, lockValue, 1) == 1 then " +
" redis.call('EXPIRE', lockKey, expireTime) " +
" return 1 " +
" else " +
" return 0 " +
" end " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(lockKey),
lockValue, String.valueOf(expireTime));

return Long.valueOf(1).equals(result);
}

/**
* 释放可重入锁
*/
public boolean unlock() {
String luaScript =
"local lockKey = KEYS[1] " +
"local lockValue = ARGV[1] " +
"local currentCount = redis.call('HGET', lockKey, lockValue) " +
"if currentCount then " +
" if tonumber(currentCount) > 1 then " +
" redis.call('HINCRBY', lockKey, lockValue, -1) " +
" return 1 " +
" else " +
" redis.call('HDEL', lockKey, lockValue) " +
" if redis.call('HLEN', lockKey) == 0 then " +
" redis.call('DEL', lockKey) " +
" end " +
" return 1 " +
" end " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(lockKey), lockValue);

return Long.valueOf(1).equals(result);
}

/**
* 获取重入次数
*/
public int getHoldCount() {
String count = redisTemplate.opsForHash().get(lockKey, lockValue).toString();
return count != null ? Integer.parseInt(count) : 0;
}
}

# 5. 看门狗机制

# 5.1 看门狗原理

看门狗机制用于解决锁的续期问题。当一个业务执行时间超过锁的过期时间时,看门狗会自动延长锁的有效期。

# 5.2 看门狗实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
/**
* 带看门狗机制的分布式锁
*/
public class WatchdogRedisLock {

private RedisTemplate<String, String> redisTemplate;
private String lockKey;
private String lockValue;
private long expireTime = 30;
private ScheduledExecutorService watchdogExecutor;
private ScheduledFuture<?> watchdogFuture;

public WatchdogRedisLock(RedisTemplate<String, String> redisTemplate, String lockKey) {
this.redisTemplate = redisTemplate;
this.lockKey = lockKey;
this.lockValue = UUID.randomUUID().toString();
this.watchdogExecutor = Executors.newSingleThreadScheduledExecutor(
r -> {
Thread thread = new Thread(r, "redis-lock-watchdog");
thread.setDaemon(true);
return thread;
});
}

/**
* 获取锁并启动看门狗
*/
public boolean lock() {
Boolean result = redisTemplate.opsForValue()
.setIfAbsent(lockKey, lockValue, expireTime, TimeUnit.SECONDS);

if (Boolean.TRUE.equals(result)) {
// 启动看门狗
startWatchdog();
return true;
}

return false;
}

/**
* 启动看门狗
*/
private void startWatchdog() {
// 锁过期时间的1/3时间后开始续期
long delay = expireTime / 3 * 1000;

watchdogFuture = watchdogExecutor.scheduleAtFixedRate(() -> {
try {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('expire', KEYS[1], ARGV[2]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(lockKey),
lockValue, String.valueOf(expireTime));

if (!Long.valueOf(1).equals(result)) {
// 续期失败,停止看门狗
stopWatchdog();
}

} catch (Exception e) {
log.error("看门狗续期异常", e);
}
}, delay, delay, TimeUnit.MILLISECONDS);
}

/**
* 停止看门狗
*/
private void stopWatchdog() {
if (watchdogFuture != null && !watchdogFuture.isCancelled()) {
watchdogFuture.cancel(true);
}
}

/**
* 释放锁
*/
public boolean unlock() {
// 停止看门狗
stopWatchdog();

String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(lockKey), lockValue);

return Long.valueOf(1).equals(result);
}
}

# 6. 分布式锁工具类封装

# 6.1 统一的锁接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 分布式锁接口
*/
public interface DistributedLock {

/**
* 获取锁
*/
boolean lock();

/**
* 带超时时间的获取锁
*/
boolean lock(long timeout, TimeUnit unit);

/**
* 释放锁
*/
boolean unlock();

/**
* 尝试获取锁
*/
boolean tryLock();

/**
* 带超时时间的尝试获取锁
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit);
}

# 6.2 锁工厂实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 分布式锁工厂
*/
@Component
public class DistributedLockFactory {

@Autowired
private RedisTemplate<String, String> redisTemplate;

@Autowired(required = false)
private List<RedisTemplate<String, String>> redisTemplates;

/**
* 创建简单锁
*/
public DistributedLock createSimpleLock(String lockKey) {
return new SimpleRedisLock(redisTemplate, lockKey);
}

/**
* 创建可重入锁
*/
public DistributedLock createReentrantLock(String lockKey) {
return new ReentrantRedisLock(redisTemplate, lockKey);
}

/**
* 创建带看门狗的锁
*/
public DistributedLock createWatchdogLock(String lockKey) {
return new WatchdogRedisLock(redisTemplate, lockKey);
}

/**
* 创建RedLock
*/
public DistributedLock createRedLock(String lockKey) {
if (redisTemplates == null || redisTemplates.size() < 3) {
throw new IllegalStateException("RedLock需要至少3个Redis实例");
}
return new RedLock(redisTemplates);
}
}

# 6.3 注解式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
/**
* 分布式锁注解
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLockAnnotation {

/**
* 锁的key
*/
String key();

/**
* 等待时间
*/
long waitTime() default 0;

/**
* 锁持有时间
*/
long leaseTime() default 30;

/**
* 时间单位
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;

/**
* 锁类型
*/
LockType lockType() default LockType.SIMPLE;

enum LockType {
SIMPLE, REENTRANT, WATCHDOG, REDLOCK
}
}

/**
* 分布式锁切面
*/
@Aspect
@Component
public class DistributedLockAspect {

@Autowired
private DistributedLockFactory lockFactory;

@Around("@annotation(distributedLock)")
public Object around(ProceedingJoinPoint joinPoint, DistributedLockAnnotation distributedLock)
throws Throwable {

String lockKey = parseLockKey(joinPoint, distributedLock.key());
DistributedLock lock = createLock(distributedLock.lockType(), lockKey);

boolean acquired = false;
try {
if (distributedLock.waitTime() > 0) {
acquired = lock.tryLock(distributedLock.waitTime(),
distributedLock.leaseTime(),
distributedLock.timeUnit());
} else {
acquired = lock.lock();
}

if (!acquired) {
throw new RuntimeException("获取分布式锁失败: " + lockKey);
}

return joinPoint.proceed();

} finally {
if (acquired) {
lock.unlock();
}
}
}

/**
* 解析锁的key
*/
private String parseLockKey(ProceedingJoinPoint joinPoint, String keyExpression) {
// 支持SpEL表达式解析
if (keyExpression.contains("#")) {
// 实现SpEL解析逻辑
return parseSpEL(keyExpression, joinPoint);
}
return keyExpression;
}

/**
* 创建锁实例
*/
private DistributedLock createLock(LockType lockType, String lockKey) {
switch (lockType) {
case REENTRANT:
return lockFactory.createReentrantLock(lockKey);
case WATCHDOG:
return lockFactory.createWatchdogLock(lockKey);
case REDLOCK:
return lockFactory.createRedLock(lockKey);
case SIMPLE:
default:
return lockFactory.createSimpleLock(lockKey);
}
}
}

# 7. 实际应用场景

# 7.1 库存扣减

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 使用分布式锁实现库存扣减
*/
@Service
public class InventoryService {

@Autowired
private DistributedLockFactory lockFactory;

/**
* 扣减库存
*/
@DistributedLockAnnotation(key = "inventory:#{productId}", waitTime = 5, leaseTime = 10)
public boolean deductInventory(Long productId, int quantity) {
// 1. 查询当前库存
Integer currentStock = inventoryMapper.getStock(productId);

// 2. 检查库存是否充足
if (currentStock == null || currentStock < quantity) {
throw new RuntimeException("库存不足");
}

// 3. 扣减库存
int result = inventoryMapper.deductStock(productId, quantity);

return result > 0;
}

/**
* 手动使用分布式锁
*/
public boolean deductInventoryManual(Long productId, int quantity) {
String lockKey = "inventory:" + productId;
DistributedLock lock = lockFactory.createReentrantLock(lockKey);

try {
if (lock.lock(5, TimeUnit.SECONDS)) {
// 执行库存扣减逻辑
return deductInventory(productId, quantity);
} else {
throw new RuntimeException("获取锁超时");
}
} finally {
lock.unlock();
}
}
}

# 7.2 限流器实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
/**
* 基于Redis分布式锁的限流器
*/
@Component
public class RateLimiter {

@Autowired
private RedisTemplate<String, String> redisTemplate;

/**
* 滑动窗口限流
*/
public boolean isAllowed(String key, int limit, int windowSize) {
long currentTime = System.currentTimeMillis();
long windowStart = currentTime - windowSize * 1000;

String luaScript =
"local key = KEYS[1] " +
"local windowStart = ARGV[1] " +
"local currentTime = ARGV[2] " +
"local limit = tonumber(ARGV[3]) " +
"redis.call('ZREMRANGEBYSCORE', key, 0, windowStart) " +
"local count = redis.call('ZCARD', key) " +
"if count < limit then " +
" redis.call('ZADD', key, currentTime, currentTime) " +
" redis.call('EXPIRE', key, " + windowSize + ") " +
" return 1 " +
"else " +
" return 0 " +
"end";

DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(luaScript, Long.class);
Long result = redisTemplate.execute(redisScript,
Collections.singletonList(key),
String.valueOf(windowStart),
String.valueOf(currentTime),
String.valueOf(limit));

return Long.valueOf(1).equals(result);
}
}

# 8. 性能优化与监控

# 8.1 性能优化建议

  1. 减少锁的粒度:尽量使用更精确的锁 key
  2. 缩短锁的持有时间:在锁内执行最少的操作
  3. 合理设置过期时间:避免时间过长或过短
  4. 使用连接池:减少 Redis 连接开销

# 8.2 监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* 分布式锁监控
*/
@Component
public class LockMonitor {

private final MeterRegistry meterRegistry;
private final Counter lockAcquisitionCounter;
private final Counter lockFailureCounter;
private final Timer lockHoldTimer;

public LockMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.lockAcquisitionCounter = Counter.builder("distributed.lock.acquisition")
.description("Lock acquisition count")
.register(meterRegistry);
this.lockFailureCounter = Counter.builder("distributed.lock.failure")
.description("Lock failure count")
.register(meterRegistry);
this.lockHoldTimer = Timer.builder("distributed.lock.hold.time")
.description("Lock hold time")
.register(meterRegistry);
}

/**
* 记录锁获取成功
*/
public void recordLockAcquisition(String lockKey) {
lockAcquisitionCounter.increment(Tags.of("lock.key", lockKey));
}

/**
* 记录锁获取失败
*/
public void recordLockFailure(String lockKey) {
lockFailureCounter.increment(Tags.of("lock.key", lockKey));
}

/**
* 记录锁持有时间
*/
public void recordLockHoldTime(String lockKey, long duration, TimeUnit unit) {
lockHoldTimer.record(duration, unit, Tags.of("lock.key", lockKey));
}
}

# 9. 常见问题与解决方案

# 9.1 锁误释放问题

问题:客户端 A 的锁被客户端 B 误释放
解决方案:使用唯一标识 + Lua 脚本保证原子性

# 9.2 锁续期问题

问题:业务执行时间超过锁的过期时间
解决方案:实现看门狗机制自动续期

# 9.3 单点故障问题

问题:Redis 单点故障导致锁服务不可用
解决方案:使用 RedLock 算法或 Redis 集群

# 9.4 性能问题

问题:高并发下锁竞争激烈,性能下降
解决方案

  • 减少锁的粒度
  • 使用分段锁
  • 优化业务逻辑,减少锁持有时间

# 10. 总结

Redis 分布式锁是分布式系统中的重要组件,本文详细介绍了从基础实现到高级特性的完整解决方案:

# 10.1 实现方案对比

方案优点缺点适用场景
简单锁实现简单存在误删风险简单场景
改进版避免误删不支持续期一般场景
可重入锁支持重入实现复杂需要重入的场景
看门狗锁自动续期资源消耗长时间业务
RedLock高可用实现复杂关键业务

# 10.2 最佳实践

  1. 选择合适的实现方案:根据业务需求选择合适的锁实现
  2. 合理设置过期时间:平衡安全性和性能
  3. 实现监控告警:及时发现锁异常
  4. 做好降级处理:锁服务异常时的备选方案
  5. 充分测试:在各种异常情况下验证锁的正确性

通过合理使用 Redis 分布式锁,可以有效解决分布式系统中的并发控制问题,保证数据的一致性和系统的稳定性。