# Redis 缓存穿透、击穿、雪崩解决方案

# 前言

在使用 Redis 作为缓存时,我们经常会遇到三个经典问题:缓存穿透、缓存击穿和缓存雪崩。这些问题会导致大量请求直接访问数据库,给数据库带来巨大压力,甚至可能导致系统崩溃。本文将详细分析这三个问题的成因、危害以及相应的解决方案。

# 1. 缓存穿透

# 1.1 什么是缓存穿透

缓存穿透是指查询一个根本不存在的数据,缓存层和存储层都没有命中。在这种情况下,每次请求都会直接打到数据库,导致数据库压力骤增。

# 1.2 缓存穿透的场景

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 典型的缓存穿透场景
public User getUserById(Long userId) {
// 1. 先查缓存
User user = redisTemplate.opsForValue().get("user:" + userId);
if (user != null) {
return user;
}

// 2. 缓存未命中,查数据库
user = userMapper.selectById(userId);
if (user != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set("user:" + userId, user, 30, TimeUnit.MINUTES);
}
return user; // 如果userId不存在,返回null
}

当恶意用户请求大量不存在的 userId 时,每次都会穿透缓存直接访问数据库。

# 1.3 缓存穿透的解决方案

# 方案一:缓存空对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 缓存空对象解决方案
*/
public User getUserById(Long userId) {
// 1. 先查缓存
User user = redisTemplate.opsForValue().get("user:" + userId);
if (user != null) {
// 如果是空对象,直接返回null
if (user.getId() == -1L) {
return null;
}
return user;
}

// 2. 缓存未命中,查数据库
user = userMapper.selectById(userId);

// 3. 无论是否存在都写入缓存
if (user != null) {
redisTemplate.opsForValue().set("user:" + userId, user, 30, TimeUnit.MINUTES);
} else {
// 缓存空对象,设置较短过期时间
User emptyUser = new User();
emptyUser.setId(-1L); // 标识为空对象
redisTemplate.opsForValue().set("user:" + userId, emptyUser, 5, TimeUnit.MINUTES);
}

return user;
}

优点:

  • 实现简单,维护方便
  • 能够有效解决缓存穿透问题

缺点:

  • 需要额外的内存空间存储空值
  • 可能导致数据不一致(空对象过期前,数据库新增了该数据)

# 方案二:布隆过滤器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/**
* 布隆过滤器解决方案
*/
@Component
public class BloomFilterService {

private BloomFilter<Long> userIdBloomFilter;

@PostConstruct
public void init() {
// 初始化布隆过滤器
userIdBloomFilter = BloomFilter.create(
Funnels.longFunnel(),
1000000, // 预期插入数据量
0.01 // 误判率
);

// 预加载所有有效的userId到布隆过滤器
loadValidUserIds();
}

/**
* 检查userId是否可能存在
*/
public boolean mightContain(Long userId) {
return userIdBloomFilter.mightContain(userId);
}

/**
* 添加userId到布隆过滤器
*/
public void put(Long userId) {
userIdBloomFilter.put(userId);
}

/**
* 预加载有效userId
*/
private void loadValidUserIds() {
List<Long> userIds = userMapper.selectAllValidIds();
userIds.forEach(userIdBloomFilter::put);
}
}

/**
* 使用布隆过滤器的业务代码
*/
@Service
public class UserService {

@Autowired
private BloomFilterService bloomFilterService;

public User getUserById(Long userId) {
// 1. 先用布隆过滤器检查
if (!bloomFilterService.mightContain(userId)) {
// 肯定不存在,直接返回
return null;
}

// 2. 查缓存
User user = redisTemplate.opsForValue().get("user:" + userId);
if (user != null) {
return user;
}

// 3. 查数据库
user = userMapper.selectById(userId);
if (user != null) {
redisTemplate.opsForValue().set("user:" + userId, user, 30, TimeUnit.MINUTES);
}

return user;
}
}

优点:

  • 内存占用小,效率高
  • 能够有效拦截无效请求

缺点:

  • 存在误判率(但不会漏判)
  • 需要维护布隆过滤器
  • 不适合动态数据场景

# 2. 缓存击穿

# 2.1 什么是缓存击穿

缓存击穿是指一个热点 key 在失效的瞬间,大量并发请求同时访问这个 key,导致所有请求都直接打到数据库上。

# 2.2 缓存击穿的场景

1
2
3
4
5
6
7
8
9
10
11
// 热点商品查询场景
public Product getProductById(Long productId) {
// 假设某个热门商品缓存刚好过期
Product product = redisTemplate.opsForValue().get("product:" + productId);
if (product == null) {
// 大量请求同时进入这里,造成缓存击穿
product = productMapper.selectById(productId);
redisTemplate.opsForValue().set("product:" + productId, product, 30, TimeUnit.MINUTES);
}
return product;
}

# 2.3 缓存击穿的解决方案

# 方案一:互斥锁(分布式锁)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* 使用分布式锁解决缓存击穿
*/
public Product getProductById(Long productId) {
// 1. 先查缓存
Product product = redisTemplate.opsForValue().get("product:" + productId);
if (product != null) {
return product;
}

// 2. 获取分布式锁
String lockKey = "lock:product:" + productId;
String lockValue = UUID.randomUUID().toString();

try {
// 尝试获取锁,最多等待10秒,锁自动释放时间30秒
Boolean locked = redisTemplate.opsForValue().setIfAbsent(
lockKey, lockValue, 30, TimeUnit.SECONDS);

if (locked) {
// 3. 获取锁成功,再次查缓存(双重检查)
product = redisTemplate.opsForValue().get("product:" + productId);
if (product == null) {
// 4. 查数据库并写入缓存
product = productMapper.selectById(productId);
if (product != null) {
redisTemplate.opsForValue().set("product:" + productId, product, 30, TimeUnit.MINUTES);
}
}
} else {
// 5. 获取锁失败,等待并重试
Thread.sleep(100);
return getProductById(productId); // 递归重试
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 6. 释放锁(使用Lua脚本确保原子性)
releaseLock(lockKey, lockValue);
}

return product;
}

/**
* 原子性释放锁
*/
private void releaseLock(String lockKey, String lockValue) {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
" return redis.call('del', KEYS[1]) " +
"else " +
" return 0 " +
"end";

redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Collections.singletonList(lockKey),
lockValue
);
}

# 方案二:逻辑过期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
* 逻辑过期解决方案
*/
@Data
public class CacheData<T> {
private T data;
private LocalDateTime expireTime;
}

/**
* 使用逻辑过期解决缓存击穿
*/
public Product getProductById(Long productId) {
// 1. 查缓存
CacheData<Product> cacheData = redisTemplate.opsForValue().get("product:" + productId);

if (cacheData != null) {
// 2. 检查是否逻辑过期
if (cacheData.getExpireTime().isAfter(LocalDateTime.now())) {
return cacheData.getData(); // 未过期,直接返回
}

// 3. 逻辑过期,尝试获取锁进行缓存更新
String lockKey = "lock:product:" + productId;
String lockValue = UUID.randomUUID().toString();

Boolean locked = redisTemplate.opsForValue().setIfAbsent(
lockKey, lockValue, 10, TimeUnit.SECONDS);

if (locked) {
// 获取锁成功,开启新线程更新缓存
CompletableFuture.runAsync(() -> {
try {
Product product = productMapper.selectById(productId);
CacheData<Product> newCacheData = new CacheData<>();
newCacheData.setData(product);
newCacheData.setExpireTime(LocalDateTime.now().plusMinutes(30));

redisTemplate.opsForValue().set("product:" + productId, newCacheData);
} finally {
releaseLock(lockKey, lockValue);
}
});
}

// 4. 返回过期数据(保证可用性)
return cacheData.getData();
}

// 5. 缓存不存在,可能是首次访问
Product product = productMapper.selectById(productId);
if (product != null) {
CacheData<Product> newCacheData = new CacheData<>();
newCacheData.setData(product);
newCacheData.setExpireTime(LocalDateTime.now().plusMinutes(30));

redisTemplate.opsForValue().set("product:" + productId, newCacheData);
}

return product;
}

# 3. 缓存雪崩

# 3.1 什么是缓存雪崩

缓存雪崩是指大量 key 在同一时间集体失效,或者 Redis 服务宕机,导致大量请求直接访问数据库,造成数据库压力剧增。

# 3.2 缓存雪崩的场景

  1. 大量 key 同时过期:系统启动时批量设置了相同的过期时间
  2. Redis 服务宕机:服务器故障、网络问题等
  3. 缓存服务器重启:维护操作或系统重启

# 3.3 缓存雪崩的解决方案

# 方案一:过期时间随机化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 随机过期时间避免集体失效
*/
public void setProductCache(Long productId, Product product) {
// 基础过期时间30分钟
long baseExpireTime = 30 * 60;

// 添加随机偏移量(0-5分钟)
long randomOffset = (long) (Math.random() * 5 * 60);

long totalExpireTime = baseExpireTime + randomOffset;

redisTemplate.opsForValue().set("product:" + productId, product, totalExpireTime, TimeUnit.SECONDS);
}

/**
* 批量设置缓存时使用随机过期时间
*/
public void batchSetProductCache(Map<Long, Product> productMap) {
Map<String, Product> cacheMap = new HashMap<>();

productMap.forEach((productId, product) -> {
String key = "product:" + productId;
cacheMap.put(key, product);

// 为每个key设置不同的过期时间
long baseExpireTime = 30 * 60;
long randomOffset = (long) (Math.random() * 10 * 60); // 0-10分钟随机
long totalExpireTime = baseExpireTime + randomOffset;

redisTemplate.opsForValue().set(key, product, totalExpireTime, TimeUnit.SECONDS);
});
}

# 方案二:缓存预热

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/**
* 缓存预热服务
*/
@Component
public class CacheWarmupService {

@Autowired
private ProductService productService;

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 系统启动时预热热点数据
*/
@PostConstruct
public void warmupCache() {
log.info("开始缓存预热...");

// 1. 预热热点商品
warmupHotProducts();

// 2. 预热热点用户
warmupHotUsers();

// 3. 预热配置数据
warmupConfigData();

log.info("缓存预热完成");
}

/**
* 预热热点商品
*/
private void warmupHotProducts() {
// 获取热门商品列表
List<Long> hotProductIds = productService.getHotProductIds();

hotProductIds.parallelStream().forEach(productId -> {
try {
Product product = productService.getProductById(productId);
if (product != null) {
// 设置较长的过期时间
redisTemplate.opsForValue().set(
"product:" + productId,
product,
2, TimeUnit.HOURS
);
}
} catch (Exception e) {
log.error("预热商品{}失败", productId, e);
}
});
}

/**
* 定时预热任务
*/
@Scheduled(fixedRate = 30 * 60 * 1000) // 每30分钟执行一次
public void scheduledWarmup() {
// 检查并预热即将过期的热点数据
warmupExpiringSoonData();
}

/**
* 预热即将过期的数据
*/
private void warmupExpiringSoonData() {
// 获取即将过期的热点key
Set<String> expiringKeys = redisTemplate.keys("product:*");

expiringKeys.parallelStream().forEach(key -> {
Long ttl = redisTemplate.getExpire(key, TimeUnit.SECONDS);
if (ttl != null && ttl < 300) { // 5分钟内过期
String productId = key.split(":")[1];
try {
Product product = productService.getProductById(Long.parseLong(productId));
if (product != null) {
// 重新设置缓存
redisTemplate.opsForValue().set(key, product, 2, TimeUnit.HOURS);
}
} catch (Exception e) {
log.error("重新预热{}失败", key, e);
}
}
});
}
}

# 方案三:高可用架构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/**
* Redis高可用配置
*/
@Configuration
public class RedisConfig {

/**
* 主从配置
*/
@Bean
public LettuceConnectionFactory redisConnectionFactory() {
RedisSentinelConfiguration sentinelConfig = new RedisSentinelConfiguration()
.master("mymaster")
.sentinel("192.168.1.101", 26379)
.sentinel("192.168.1.102", 26379)
.sentinel("192.168.1.103", 26379)
.setPassword("yourpassword");

return new LettuceConnectionFactory(sentinelConfig);
}

/**
* 集群配置
*/
@Bean
public RedisClusterConfiguration redisClusterConfiguration() {
RedisClusterConfiguration clusterConfig = new RedisClusterConfiguration();
clusterConfig.clusterNode("192.168.1.101", 6379);
clusterConfig.clusterNode("192.168.1.102", 6379);
clusterConfig.clusterNode("192.168.1.103", 6379);
clusterConfig.setPassword("yourpassword");

return clusterConfig;
}
}

/**
* 多级缓存降级策略
*/
@Service
public class MultiLevelCacheService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CaffeineCache localCache;

/**
* 多级缓存查询
*/
public Product getProductById(Long productId) {
String key = "product:" + productId;

// 1. 本地缓存(Caffeine)
Product product = localCache.getIfPresent(key);
if (product != null) {
return product;
}

try {
// 2. Redis缓存
product = (Product) redisTemplate.opsForValue().get(key);
if (product != null) {
localCache.put(key, product);
return product;
}
} catch (Exception e) {
log.warn("Redis缓存异常,降级到数据库查询", e);
}

// 3. 数据库查询
product = productMapper.selectById(productId);
if (product != null) {
try {
// 尝试写入Redis
redisTemplate.opsForValue().set(key, product, 30, TimeUnit.MINUTES);
} catch (Exception e) {
log.warn("写入Redis缓存失败", e);
}
// 写入本地缓存
localCache.put(key, product);
}

return product;
}
}

# 4. 监控与告警

# 4.1 缓存监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
/**
* 缓存监控服务
*/
@Component
public class CacheMonitorService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

/**
* 获取缓存命中率
*/
public double getCacheHitRate() {
// 通过Redis info命令获取统计信息
Properties info = redisTemplate.getConnectionFactory()
.getConnection()
.info();

long hits = Long.parseLong(info.getProperty("keyspace_hits", "0"));
long misses = Long.parseLong(info.getProperty("keyspace_misses", "0"));

return hits + misses == 0 ? 0.0 : (double) hits / (hits + misses);
}

/**
* 获取内存使用情况
*/
public Map<String, Object> getMemoryInfo() {
Properties info = redisTemplate.getConnectionFactory()
.getConnection()
.info("memory");

Map<String, Object> memoryInfo = new HashMap<>();
memoryInfo.put("used_memory", info.getProperty("used_memory"));
memoryInfo.put("used_memory_human", info.getProperty("used_memory_human"));
memoryInfo.put("used_memory_rss", info.getProperty("used_memory_rss"));
memoryInfo.put("used_memory_peak", info.getProperty("used_memory_peak"));
memoryInfo.put("maxmemory", info.getProperty("maxmemory"));

return memoryInfo;
}

/**
* 获取连接数
*/
public int getConnectedClients() {
Properties info = redisTemplate.getConnectionFactory()
.getConnection()
.info("clients");

return Integer.parseInt(info.getProperty("connected_clients", "0"));
}
}

/**
* 缓存告警服务
*/
@Component
public class CacheAlertService {

@Autowired
private CacheMonitorService monitorService;

/**
* 定时检查缓存状态
*/
@Scheduled(fixedRate = 60 * 1000) // 每分钟检查一次
public void checkCacheHealth() {
try {
// 检查命中率
double hitRate = monitorService.getCacheHitRate();
if (hitRate < 0.8) { // 命中率低于80%
sendAlert("缓存命中率过低: " + String.format("%.2f%%", hitRate * 100));
}

// 检查内存使用
Map<String, Object> memoryInfo = monitorService.getMemoryInfo();
long usedMemory = Long.parseLong(memoryInfo.get("used_memory").toString());
long maxMemory = Long.parseLong(memoryInfo.get("maxmemory").toString());

if (maxMemory > 0 && (double) usedMemory / maxMemory > 0.9) { // 内存使用超过90%
sendAlert("Redis内存使用率过高: " + String.format("%.2f%%", (double) usedMemory / maxMemory * 100));
}

// 检查连接数
int connectedClients = monitorService.getConnectedClients();
if (connectedClients > 1000) { // 连接数超过1000
sendAlert("Redis连接数过高: " + connectedClients);
}

} catch (Exception e) {
sendAlert("缓存监控异常: " + e.getMessage());
}
}

/**
* 发送告警
*/
private void sendAlert(String message) {
// 实现告警逻辑(邮件、短信、钉钉等)
log.error("缓存告警: {}", message);
// alertService.send(message);
}
}

# 5. 最佳实践总结

# 5.1 架构设计原则

  1. 分层缓存:本地缓存 + 分布式缓存
  2. 降级策略:缓存不可用时的备选方案
  3. 监控告警:实时监控缓存状态
  4. 容错处理:异常情况下的优雅降级

# 5.2 代码规范

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 统一的缓存服务封装
*/
@Service
public class CacheService {

@Autowired
private RedisTemplate<String, Object> redisTemplate;

@Autowired
private CacheMonitorService monitorService;

/**
* 安全的缓存操作
*/
public <T> T get(String key, Class<T> clazz, Supplier<T> dataLoader,
long expireTime, TimeUnit timeUnit) {
try {
// 1. 查缓存
T data = (T) redisTemplate.opsForValue().get(key);
if (data != null) {
return data;
}

// 2. 加载数据
T result = dataLoader.get();
if (result != null) {
// 3. 写入缓存
redisTemplate.opsForValue().set(key, result, expireTime, timeUnit);
}

return result;

} catch (Exception e) {
log.error("缓存操作异常,key: {}", key, e);
// 降级到直接加载数据
return dataLoader.get();
}
}

/**
* 批量缓存操作
*/
public <T> Map<String, T> multiGet(Collection<String> keys, Class<T> clazz) {
try {
List<Object> values = redisTemplate.opsForValue().multiGet(keys);
Map<String, T> result = new HashMap<>();

int index = 0;
for (String key : keys) {
Object value = values.get(index++);
if (value != null) {
result.put(key, (T) value);
}
}

return result;
} catch (Exception e) {
log.error("批量缓存查询异常", e);
return Collections.emptyMap();
}
}
}

# 5.3 性能优化

  1. 连接池配置:合理配置连接池参数
  2. 序列化优化:使用高效的序列化方式
  3. 批量操作:减少网络往返次数
  4. Pipeline:使用管道提高吞吐量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/**
* 性能优化配置
*/
@Configuration
public class RedisPerformanceConfig {

@Bean
public LettuceConnectionFactory lettuceConnectionFactory() {
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig =
new GenericObjectPoolConfig<>();
poolConfig.setMaxTotal(20); // 最大连接数
poolConfig.setMaxIdle(10); // 最大空闲连接
poolConfig.setMinIdle(5); // 最小空闲连接
poolConfig.setMaxWaitMillis(3000); // 最大等待时间

LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
.poolConfig(poolConfig)
.commandTimeout(Duration.ofSeconds(2))
.shutdownTimeout(Duration.ZERO)
.build();

return new LettuceConnectionFactory(
new RedisStandaloneConfiguration("localhost", 6379),
clientConfig);
}

/**
* 使用Pipeline批量操作
*/
public void batchSetData(Map<String, Object> dataMap, long expireTime, TimeUnit timeUnit) {
redisTemplate.executePipelined(new RedisCallback<Object>() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
dataMap.forEach((key, value) -> {
connection.set(key.getBytes(), SerializationUtils.serialize(value));
connection.expire(key.getBytes(), timeUnit.toSeconds(expireTime));
});
return null;
}
});
}
}

# 6. 总结

缓存穿透、击穿、雪崩是 Redis 使用过程中的常见问题,需要从架构设计、代码实现、监控告警等多个维度进行综合考虑:

# 6.1 问题对比

问题类型成因解决方案适用场景
缓存穿透查询不存在的数据缓存空对象、布隆过滤器防恶意攻击
缓存击穿热点 key 过期互斥锁、逻辑过期热点数据保护
缓存雪崩大量 key 同时失效随机过期、缓存预热、高可用系统稳定性

# 6.2 最佳实践

  1. 预防为主:通过合理的设计避免问题发生
  2. 监控告警:及时发现和处理异常情况
  3. 降级策略:确保系统在异常情况下的可用性
  4. 性能优化:持续优化缓存性能

通过以上措施,可以有效解决 Redis 缓存中的三大问题,构建稳定、高效的缓存系统。