摘要线上服务突然雪崩,排查发现是 Redis 缓存穿透惹的祸。本文分享 5 个实战方案:布隆过滤器、空值缓存、互斥锁、逻辑过期、分级缓存。亲测有效,帮你避开 90% 的缓存陷阱。一、开篇引入上周三凌晨 2 点,手机疯狂震动。线上服务挂了,CPU 飙到 100%,数据库连接池爆满。爬起来一看监控,好家伙,QPS 从 2000 瞬间掉到 50。排查半小时,定位到问题:Redis 缓存穿透。某个热点 key 过期后,大量请求直接打到数据库,数据库扛不住,雪崩连锁反应。说实话,这种坑我踩过不止一次。今天把压箱底的 5 个解决方案掏出来,帮你避开这些血泪教训。二、什么是缓存穿透?先统一一下认知。缓存穿透:查询一个根本不存在的数据,缓存层没有,存储层也没有。每次请求都穿透到数据库。缓存击穿:某个热点 key 过期瞬间,大量并发请求同时访问这个 key,直接打到数据库。缓存雪崩:大量 key 同时过期,或者 Redis 宕机,请求全部涌向数据库。这三种问题,穿透最隐蔽,击穿最常见,雪崩最致命。今天重点说穿透,顺带把击穿和雪崩的预防方案也讲了。方案一:布隆过滤器(推荐指数 ⭐⭐⭐⭐⭐)适用场景:判断数据是否存在,允许少量误判布隆过滤器的思路很简单:在缓存前面加一层拦截器。请求来了先问布隆过滤器:这个 key 存在吗?过滤器说不存在 直接返回,不查缓存也不查数据库过滤器说可能存在 继续查缓存,缓存没有再查数据库核心优势:内存占用极小,几百万数据也就几 MB查询速度 O(k),k 是哈希函数个数,基本算 O(1)误判率可控,一般控制在 1% 以内直接上代码:from redis import Redis from redisbloom.client import Client # 初始化 redis_client = Redis(host='localhost', port=6379) bf_client = Client(redis_client) # 创建布隆过滤器 bf_client.create('product_bf', errorRate=0.01, capacity=1000000) # 添加数据 def add_to_bloom(product_id): bf_client.add('product_bf', product_id) # 查询前检查 def get_product(product_id): # 先问布隆过滤器 ifnot bf_client.exists('product_bf', product_id): returnNone# 肯定不存在,直接返回 # 可能存在,查缓存 cached = redis_client.get(f'product:{product_id}') if cached: return cached # 缓存没有,查数据库 product = db.query_product(product_id) if product: redis_client.setex(f'product:{product_id}', 3600, product) return product returnNone踩过的坑:布隆过滤器不支持删除,数据删除了过滤器里还在误判率不是 0,会有少量假阳性capacity 要预估好,满了之后误判率会飙升我的建议:商品 ID、用户 ID 这种只增不减的场景,用布隆过滤器准没错。方案二:空值缓存(推荐指数 ⭐⭐⭐⭐)适用场景:数据不存在的情况也需要缓存这个方案更简单:数据库查不到,也把空结果缓存起来。def get_product(product_id): # 查缓存 cached = redis_client.get(f'product:{product_id}') if cached isnotNone: return cached if cached != '__NULL__'elseNone # 查数据库 product = db.query_product(product_id) if product: redis_client.setex(f'product:{product_id}', 3600, product) else: # 空值也缓存,过期时间设短一点 redis_client.setex(f'product:{product_id}', 300, '__NULL__') return product关键点:空值缓存的过期时间要短,一般 5-10 分钟用特殊标记(如__NULL__)区分真空和缓存未命中适合数据不频繁变化的场景这个方案很香,实现成本最低,90% 的场景够用。方案三:互斥锁(推荐指数 ⭐⭐⭐⭐)适用场景:缓存击穿防护,避免并发请求同时查数据库核心思路:同一时间只让一个请求去查数据库,其他请求等着。import time from redis.lock import Lock def get_product_with_lock(product_id): key = f'product:{product_id}' lock_key = f'lock:{product_id}' # 查缓存 cached = redis_client.get(key) if cached: return cached # 尝试获取锁 lock = Lock(redis_client, lock_key, timeout=10) if lock.acquire(blocking=False): try: # 双重检查,防止锁等待期间已有请求更新缓存 cached = redis_client.get(key) if cached: return cached # 查数据库 product = db.query_product(product_id) redis_client.setex(key, 3600, product or'__NULL__') return product finally: lock.release() else: # 没抢到锁,等一会儿再试 time.sleep(0.1) return get_product_with_lock(product_id)注意事项:锁的超时时间要合理,避免死锁一定要做双重检查,锁等待期间可能已有请求更新了缓存递归重试要加次数限制,防止无限循环坦白讲,这个方案在高并发场景下性能有损耗,但胜在稳妥。方案四:逻辑过期(推荐指数 ⭐⭐⭐⭐⭐)适用场景:对实时性要求不高,追求极致性能这个思路比较巧妙:缓存里不存真实的过期时间,而是存一个逻辑过期时间。import threading import time def get_product_logical_expire(product_id): key = f'product:{product_id}' cached = redis_client.get(key) ifnot cached: # 缓存没有,查数据库 product = db.query_product(product_id) # 存入缓存,带逻辑过期时间 data = { 'data': product, 'expire_at': time.time() + 3600# 1 小时后逻辑过期 } redis_client.setex(key, 7200, json.dumps(data)) # 物理过期时间设长 return product data = json.loads(cached) # 检查是否逻辑过期 if time.time() data['expire_at']: # 逻辑过期了,启动异步线程更新 threading.Thread(target=refresh_cache, args=(product_id,)).start() # 但先返回旧数据,保证性能 return data['data'] def refresh_cache(product_id): # 异步更新缓存 product = db.query_product(product_id) key = f'product:{product_id}' data = { 'data': product, 'expire_at': time.time() + 3600 } redis_client.setex(key, 7200, json.dumps(data))核心优势:用户请求永远不阻塞,直接返回缓存(哪怕是旧数据)后台异步更新,性能极致适合商品详情、配置信息这种对实时性不敏感的场景缺点:短暂的数据不一致实现复杂度稍高方案五:分级缓存(推荐指数 ⭐⭐⭐)适用场景:超大规模系统,单机扛不住思路:本地缓存 + 分布式缓存 + 数据库,三级防护。from functools import lru_cache # 本地缓存(进程内) @lru_cache(maxsize=1000) def get_product_local(product_id): # 先查 Redis cached = redis_client.get(f'product:{product_id}') if cached: return cached # Redis 没有,查数据库 product = db.query_product(product_id) if product: redis_client.setex(f'product:{product_id}', 3600, product) return product分级策略:L1 本地缓存:热点数据,过期时间 1-5 分钟L2 Redis 缓存:一般数据,过期时间 1 小时L3 数据库:兜底适用场景:日活百万级以上的应用读多写少的场景对性能要求极致说实话,小项目别搞这个,徒增复杂度。三、技术选型建议5 个方案,怎么选?看场景:场景推荐方案理由判断数据是否存在布隆过滤器内存小、速度快数据不频繁变化空值缓存实现最简单高并发热点数据互斥锁稳妥可靠对实时性不敏感逻辑过期性能极致超大规模系统分级缓存多层防护我的决策树:先问自己:数据是否存在? 是,用空值缓存;不确定,用布隆过滤器再问:并发量大吗? 大,加互斥锁最后问:能接受短暂不一致吗? 能,用逻辑过期四、踩坑经验总结干了这么多年,这些坑我替你踩过了:坑 1:缓存时间设太长数据改了缓存没改,用户看到旧数据建议:根据业务特性设置,一般 30 分钟到 2 小时坑 2:缓存时间设太短缓存频繁失效,等于没缓存建议:热点数据至少 1 小时起步坑 3:大 key 问题单个 key 超过 10KB,Redis 性能下降建议:大对象拆分存储,或用压缩坑 4:缓存穿透 + 雪崩一起发生最惨的情况,直接服务不可用建议:布隆过滤器 + 随机过期时间(基础时间 20%)坑 5:没做监控告警等用户投诉才知道挂了建议:缓存命中率、数据库 QPS、响应时间,这三个指标必须监控五、结尾互动缓存问题说难不难,说简单也不简单。核心就一句话:别让请求轻易穿透到数据库。今天分享的 5 个方案,我自己在生产环境都验证过。空值缓存 + 布隆过滤器这套组合拳,能解决 90% 的问题。最后留个思考题:你的系统里,缓存命中率一般是多少?低于 80% 就要警惕了。觉得有用,点个在看,分享给更多被缓存问题折磨的兄弟。下期聊聊Redis 集群方案选型:Codis vs Redis Cluster vs Twemproxy,想看的评论区扣 1。