述
上文说了redis的内存淘汰策略,下面再来看一下使用缓存的过程中一些常见的问题
我们在使用redis做缓存的时候,一般流程是这样的
- 请求进来时候首先查询redis判断是否存在缓存且缓存是否过期
- 若已经存在不过期的缓存则直接获取返回
- 若缓存不存在或已过期则重新查询数据库并将该数据存到redis中
用代码表示如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19@Autowired
private RedisTemplate redisTemplate;
public List<String> getValueBySql(String key){
System.out.println("这里模拟从数据库中获取数据");
return new ArrayList<>();
}
public List<String> getCache(String key){
List<String> resultList = (List<String>)redisTemplate.opsForValue().get(key);
if(resultList == null || CollectionUtils.isEmpty(resultList)){
//若缓存不存在则从数据库获取并设置时间
resultList = getValueBySql(key);
redisTemplate.opsForValue().set(key, resultList, 1000, TimeUnit.SECONDS);
return resultList;
}else{
return resultList;
}
}
缓存击穿
概念
如上面的经典缓存流程,在整个流程中我们需要先查询redis,在redis没有的时候再去查数据库最后再将数据库返回的数据存到redis中
如果有一些key被超高并发的访问,如果在某个时间点这些key过期了,恰好这个时间有对这个key的大量的并发请求过来,这些请求先redisTemplate.opsForValue().get(key);
,发现在缓存里面没有查到数据,这时候,所有的请求都会去访问DB,大并发的请求可能会瞬间把后端DB压垮
解决方案一
使用synchronized+双检查机制(适用于单机模式),代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17public List<String> getCacheSave(String key){
List<String> resultList = (List<String>)redisTemplate.opsForValue().get(key);
if(resultList == null || CollectionUtils.isEmpty(resultList)){
//采用synchronized保证一次只有一个请求进入到这个代码块
synchronized (this){
resultList = (List<String>)redisTemplate.opsForValue().get(key);
if(CollectionUtils.isEmpty(resultList)){
return resultList;
}
resultList = getValueBySql(key);
redisTemplate.opsForValue().set(key, resultList, 1000, TimeUnit.SECONDS);
return resultList;
}
}else{
return resultList;
}
}
- 上面代码第一个判断保证在缓存有数据时,让查询缓存的请求不必排队,减小了同步的粒度
- synchronized (this)保证查询数据库是同步操作,同一时刻只能有一个请求查询数据库
- 第二个判断保证所有在redis有缓存时,其他请求无需在查意思数据库.若没有这个判断,其他已经等待synchronized 解锁的请求会在请求一次数据库
解决方案二
使用互斥锁(适用于分布式模式),图,使用分布式锁保证只有一个线程查询数据库,其他线程采用重试的方式进行获取:
代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34public List<String> getCacheSave2(String key,int retryCount) throws InterruptedException {
List<String> resultList = (List<String>)redisTemplate.opsForValue().get(key);
if(CollectionUtils.isEmpty(resultList)){
final String mutexKey = key + "_lock";
boolean isLock = (Boolean) redisTemplate.execute(new RedisCallback() {
@Override
public Object doInRedis(RedisConnection connection) throws DataAccessException {
//只在键key不存在的情况下,将键key的值设置为value,若键key已经存在,则 SETNX 命令不做任何动作
//命令在设置成功时返回 1 , 设置失败时返回 0
return connection.setNX(mutexKey.getBytes(),"1".getBytes());
}
});
if(isLock){
//设置成1秒过期
redisTemplate.expire(mutexKey, 1000, TimeUnit.MILLISECONDS);
resultList = getValueBySql(key);
redisTemplate.opsForValue().set(key, resultList, 1000, TimeUnit.SECONDS);
redisTemplate.delete(mutexKey);
}else{
//线程休息50毫秒后重试
Thread.sleep(50);
retryCount--;
System.out.println("=====进行重试,当前次数:" + retryCount);
if(retryCount == 0){
System.out.println("====这里发邮件或者记录下获取不到数据的日志,并为key设置一个空置防止重复获取");
List<String> list = Lists.newArrayList("no find");
redisTemplate.opsForValue().set(key, list, 1000, TimeUnit.SECONDS);
return list;
}
return getCacheSave2(key,retryCount);
}
}
return resultList;
}
简单地来说,就是在缓存失效的时候(判断拿出来的值为空),不是立即去load db,而是先使用缓存工具的某些带成功操作返回值的操作(比如Redis的SETNX或者Memcache的ADD)去set一个mutex key,当操作返回成功时,再进行load db的操作并回设缓存,否则,就重试整个get缓存的方法
缓存穿透
概念
缓存穿透是指查询一个一定不存在的数据,按照规则,在缓存中查不到的话,就会去DB查询,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义.在流量大时,可能DB就挂掉了,要是有人利用不存在的key频繁攻击我们的应用,这就是漏洞.
解决方案
有很多种方法可以有效地解决缓存穿透问题,最常见的则是采用布隆过滤器,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据会被这个bitmap拦截掉,从而避免了对底层存储系统的查询压力.另外也有一个更为简单粗暴的方法,如果一个查询返回的数据为空(不管是数据不存在,还是系统故障),我们仍然把这个空结果进行缓存,但它的过期时间会很短,最长不超过五分钟.
缓存雪崩
概念
缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失效,请求全部转发到DB,DB瞬时压力过重雪崩.
解决方案
缓存失效时的雪崩效应对底层系统的冲击非常可怕.大多数系统设计者考虑用加锁或者队列的方式保证缓存的单线程(进程)写,从而避免失效时大量的并发请求落到底层存储系统上.这里分享一个简单方案就是将缓存失效时间分散开,比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件.