我来用一个实际的电商库存管理案例,演示Python缓存过期的各种场景和解决方案。
场景:电商商品库存缓存系统
基础缓存过期实现
import time
import threading
from datetime import datetime, timedelta
from typing import Any, Optional, Dict
class SimpleCache:
"""简单的内存缓存,支持过期时间"""
def __init__(self):
self._cache: Dict[str, tuple[Any, Optional[float]]] = {}
self._lock = threading.Lock()
def set(self, key: str, value: Any, ttl: Optional[int] = None):
"""
设置缓存
:param key: 缓存键
:param value: 缓存值
:param ttl: 过期时间(秒),None表示永不过期
"""
expire_time = time.time() + ttl if ttl else None
with self._lock:
self._cache[key] = (value, expire_time)
print(f"[缓存] 设置 {key} = {value}, 过期时间: "
f"{datetime.fromtimestamp(expire_time) if ttl else '永久'}")
def get(self, key: str) -> Optional[Any]:
"""获取缓存"""
with self._lock:
if key not in self._cache:
return None
value, expire_time = self._cache[key]
# 检查是否过期
if expire_time and time.time() > expire_time:
del self._cache[key]
print(f"[缓存] {key} 已过期,自动删除")
return None
return value
def clear_expired(self):
"""清理过期缓存"""
with self._lock:
now = time.time()
expired_keys = [
key for key, (_, expire_time) in self._cache.items()
if expire_time and now > expire_time
]
for key in expired_keys:
del self._cache[key]
if expired_keys:
print(f"[缓存] 清理了 {len(expired_keys)} 个过期项")
# 使用示例
cache = SimpleCache()
# 模拟商品库存数据
def get_product_stock(product_id: str) -> int:
"""模拟从数据库获取库存(通常很慢)"""
print(f"[数据库] 查询商品 {product_id} 库存...")
time.sleep(1) # 模拟数据库查询延迟
return 100 # 模拟返回库存数量
def get_product_stock_with_cache(product_id: str) -> int:
"""带缓存的库存查询"""
cache_key = f"stock:{product_id}"
# 尝试从缓存获取
stock = cache.get(cache_key)
if stock is not None:
print(f"[缓存] 命中!商品 {product_id} 库存: {stock}")
return stock
# 缓存未命中,从数据库获取
print(f"[缓存] 未命中,查询数据库...")
stock = get_product_stock(product_id)
# 设置缓存,过期时间30秒
cache.set(cache_key, stock, ttl=30)
return stock
# 运行测试
print("=== 缓存过期测试 ===")
# 第一次查询 - 缓存未命中
print("\n--- 第一次查询 ---")
stock1 = get_product_stock_with_cache("P001")
# 第二次查询 - 缓存命中
print("\n--- 第二次查询(立即) ---")
stock2 = get_product_stock_with_cache("P001")
# 等待缓存过期
print(f"\n--- 等待40秒后查询 ---")
time.sleep(40)
stock3 = get_product_stock_with_cache("P001")
基于TTL的主动过期策略
import redis
from functools import wraps
import hashlib
import json
class RedisCacheExample:
"""使用Redis实现缓存过期(实际生产环境)"""
def __init__(self, host='localhost', port=6379, db=0):
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True
)
def set_cache(self, key: str, value: Any, ttl: int = 300):
"""设置缓存,支持各种数据类型"""
# 将Python对象序列化为JSON
if not isinstance(value, (str, bytes)):
value = json.dumps(value, ensure_ascii=False)
self.redis_client.set(key, value, ex=ttl)
print(f"[Redis缓存] 设置 {key}, TTL={ttl}秒")
def get_cache(self, key: str) -> Optional[Any]:
"""获取缓存"""
value = self.redis_client.get(key)
if value:
try:
# 尝试解析JSON
return json.loads(value)
except (json.JSONDecodeError, TypeError):
return value
return None
def get_or_set(self, key: str, func, ttl: int = 300):
"""先尝试获取缓存,如果没有则调用函数并缓存结果"""
# 检查缓存
cached_value = self.get_cache(key)
if cached_value is not None:
print(f"[Redis缓存] 命中: {key}")
return cached_value
# 执行函数获取数据
result = func()
# 缓存结果
self.set_cache(key, result, ttl)
return result
# 使用装饰器实现自动缓存
def cache_with_ttl(ttl: int = 300):
"""缓存装饰器"""
redis_cache = RedisCacheExample()
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
key = f"{func.__name__}:{hashlib.md5(
str(args).encode() + str(kwargs).encode()
).hexdigest()}"
return redis_cache.get_or_set(key, lambda: func(*args, **kwargs), ttl)
return wrapper
return decorator
# 使用示例
@cache_with_ttl(ttl=60) # 缓存60秒
def get_product_price(product_id: str) -> dict:
"""获取商品价格信息"""
print(f"[数据库] 查询商品 {product_id} 价格...")
time.sleep(0.5)
return {
"product_id": product_id,
"price": 99.99,
"currency": "CNY",
"last_updated": datetime.now().isoformat()
}
# 测试
print("\n=== Redis缓存测试 ===")
result1 = get_product_price("P001")
print(f"结果1: {result1}")
result2 = get_product_price("P001")
print(f"结果2: {result2}")
print("\n等待缓存过期...")
time.sleep(65)
result3 = get_product_price("P001")
print(f"结果3: {result3}")
多级缓存策略(本地+远端)
from functools import partial
import weakref
class MultiLevelCache:
"""多级缓存:内存 -> Redis -> 数据库"""
def __init__(self, memory_ttl=60, redis_ttl=300):
self.memory_cache = SimpleCache()
self.redis_cache = RedisCacheExample()
self.memory_ttl = memory_ttl
self.redis_ttl = redis_ttl
def get(self, key: str, db_func):
"""
多级缓存获取
Level 1: 内存缓存(最快,但容量小)
Level 2: Redis缓存(快速,容量大)
Level 3: 数据库(最慢)
"""
# Level 1: 尝试内存缓存
value = self.memory_cache.get(key)
if value is not None:
print(f"[L1] 内存缓存命中: {key}")
return value
# Level 2: 尝试Redis缓存
value = self.redis_cache.get_cache(key)
if value is not None:
print(f"[L2] Redis缓存命中: {key}")
# 将数据回填到内存缓存(但TTL设短一些,防止不一致)
self.memory_cache.set(key, value, ttl=self.memory_ttl)
return value
# Level 3: 查询数据库
print(f"[L3] 查询数据库: {key}")
value = db_func()
# 写入Redis和内存缓存
self.redis_cache.set_cache(key, value, ttl=self.redis_ttl)
self.memory_cache.set(key, value, ttl=self.memory_ttl)
return value
def invalidate(self, key: str):
"""使缓存失效(数据更新时调用)"""
self.memory_cache.get(key) # 触发检查并删除
self.redis_cache.redis_client.delete(key)
print(f"[缓存] 使 {key} 失效")
# 模拟商品更新
class ProductService:
def __init__(self):
self.cache = MultiLevelCache()
def get_product(self, product_id: str):
"""获取商品信息"""
key = f"product:{product_id}"
def fetch_from_db():
"""模拟数据库查询"""
print(f"[数据库] 查询商品 {product_id}...")
time.sleep(0.5)
return {
"id": product_id,
"name": f"商品{product_id}",
"stock": 100,
"updated_at": datetime.now().isoformat()
}
return self.cache.get(key, fetch_from_db)
def update_product_stock(self, product_id: str, stock: int):
"""更新商品库存"""
# 更新数据库(模拟)
print(f"[数据库] 更新商品 {product_id} 库存为 {stock}")
# 使缓存失效
key = f"product:{product_id}"
self.cache.invalidate(key)
# 测试多级缓存
print("\n=== 多级缓存测试 ===")
service = ProductService()
# 第一次查询
print("\n--- 第一次查询 ---")
start = time.time()
product = service.get_product("P001")
print(f"结果: {product}, 耗时: {time.time()-start:.2f}秒")
# 第二次查询(应该从内存缓存)
print("\n--- 第二次查询(立即) ---")
start = time.time()
product = service.get_product("P001")
print(f"结果: {product}, 耗时: {time.time()-start:.2f}秒")
# 更新库存
print("\n--- 更新库存 ---")
service.update_product_stock("P001", 80)
# 再次查询(缓存失效,从数据库重新加载)
print("\n--- 更新后查询 ---")
start = time.time()
product = service.get_product("P001")
print(f"结果: {product}, 耗时: {time.time()-start:.2f}秒")
高级缓存策略:防缓存击穿
import asyncio
from functools import lru_cache
import random
class CachePrevention:
"""防止缓存击穿、雪崩和穿透的高级缓存"""
def __init__(self):
self.cache = SimpleCache()
def get_with_fallback(self, key: str, db_func, ttl: int = 300):
"""
带降级策略的缓存获取
防止缓存击穿:当缓存失效时,保护数据库不被高并发击穿
"""
# 尝试获取缓存
value = self.cache.get(key)
if value is not None:
return value
# 使用互斥锁防止缓存击穿(实际生产用分布式锁)
# 这里简化处理,用随机延迟模拟
delay = random.uniform(0, 0.1)
time.sleep(delay)
# 再次检查缓存(可能其他线程已经加载了)
value = self.cache.get(key)
if value is not None:
return value
# 查询数据库
print(f"[数据库] 查询数据: {key}")
value = db_func()
# 设置缓存(添加随机过期时间防止雪崩)
actual_ttl = ttl + random.randint(-60, 60) # 随机偏移
self.cache.set(key, value, ttl=max(1, actual_ttl))
return value
def get_with_stale(self, key: str, db_func, ttl: int = 300, stale_ttl: int = 600):
"""
使用陈旧数据策略
当缓存过期但数据库不可用时,可以返回旧数据
"""
# 尝试获取缓存(包括过期但未删除的)
with self.cache._lock:
if key in self.cache._cache:
value, expire_time = self.cache._cache[key]
if expire_time and time.time() <= expire_time:
return value # 未过期,直接返回
# 缓存已过期,但我们可以保留旧值
if expire_time and time.time() <= expire_time + stale_ttl:
# 异步刷新缓存(不阻塞当前请求)
print(f"[缓存] 返回陈旧数据: {key}")
self._async_refresh(key, db_func, ttl)
return value # 返回旧数据
# 缓存不存在,同步加载
value = db_func()
self.cache.set(key, value, ttl=ttl)
return value
def _async_refresh(self, key: str, db_func, ttl: int):
"""异步刷新缓存"""
try:
value = db_func()
self.cache.set(key, value, ttl=ttl)
print(f"[缓存] 异步刷新成功: {key}")
except Exception as e:
print(f"[缓存] 异步刷新失败: {e}")
# 测试缓存击穿防护
print("\n=== 缓存击穿防护测试 ===")
prevention = CachePrevention()
def simulate_high_concurrency():
"""模拟高并发场景"""
def query_db():
"""模拟数据库查询(可能很慢或失败)"""
time.sleep(0.5)
return f"数据_{random.randint(1, 100)}"
# 模拟多个并发请求
results = []
for i in range(10):
result = prevention.get_with_stale(
"hot_data",
query_db,
ttl=10, # 短过期时间
stale_ttl=30 # 允许使用过期数据的窗口
)
results.append(result)
print(f"请求 {i+1}: 获取到 {result}")
return results
# 运行并发测试
results = simulate_high_concurrency()
实际应用:商品详情页面缓存
class ProductDetailCache:
"""商品详情页缓存管理系统"""
def __init__(self):
self.cache = SimpleCache()
self.stale_cache = SimpleCache() # 存储陈旧的缓存
self.redis = RedisCacheExample()
def build_product_detail(self, product_id: str) -> dict:
"""
构建完整的商品详情
包含多个数据源的组合和缓存
"""
cached = self.cache.get(f"detail:{product_id}")
if cached:
return cached
# 复杂的数据聚合
detail = {
"basic_info": self._get_basic_info(product_id),
"inventory": self._get_inventory(product_id),
"price": self._get_price(product_id),
"reviews": self._get_reviews(product_id),
"recommendations": self._get_recommendations(product_id),
"generated_at": datetime.now().isoformat()
}
# 按不同数据源的更新频率设置不同的TTL
# 库存数据更新最快,设为30秒
# 基础信息最稳定,设为5分钟
self.cache.set(f"detail:{product_id}", detail, ttl=120)
return detail
def _get_basic_info(self, product_id: str) -> dict:
"""商品基本信息(缓存时间较长)"""
cache_key = f"basic:{product_id}"
cached = self.cache.get(cache_key)
if cached:
return cached
# 模拟数据库查询
info = {
"name": f"商品{product_id}",
"description": "这是一个很棒的商品",
"category": "电子产品",
"brand": "知名品牌"
}
self.cache.set(cache_key, info, ttl=300) # 5分钟
return info
def _get_inventory(self, product_id: str) -> dict:
"""库存信息(缓存时间较短)"""
cache_key = f"inventory:{product_id}"
cached = self.cache.get(cache_key)
if cached:
return cached
# 模拟实时库存查询
import random
inventory = {
"total": random.randint(10, 1000),
"available": random.randint(5, 500),
"warehouse": "北京1号库"
}
self.cache.set(cache_key, inventory, ttl=30) # 30秒
return inventory
def _get_price(self, product_id: str) -> dict:
"""价格信息(使用Redis缓存,支持分布式)"""
cache_key = f"price:{product_id}"
cached = self.redis.get_cache(cache_key)
if cached:
return cached
price = {
"current": 99.99,
"original": 129.99,
"promotion": "夏季特惠",
"valid_until": (datetime.now() + timedelta(days=7)).isoformat()
}
self.redis.set_cache(cache_key, price, ttl=60) # 1分钟
return price
def _get_reviews(self, product_id: str) -> list:
"""评价信息"""
cache_key = f"reviews:{product_id}"
cached = self.cache.get(cache_key)
if cached:
return cached
reviews = [
{"user": "用户A", "rating": 5, "content": "非常好用"},
{"user": "用户B", "rating": 4, "content": "性价比高"},
]
self.cache.set(cache_key, reviews, ttl=180) # 3分钟
return reviews
def _get_recommendations(self, product_id: str) -> list:
"""推荐商品(缓存时间长,因为计算复杂)"""
# 使用lru_cache装饰器(Python内置)
return self._compute_recommendations(product_id)
@lru_cache(maxsize=100)
def _compute_recommendations(self, product_id: str) -> list:
"""计算推荐商品(使用LRU缓存)"""
# 模拟复杂的推荐算法
time.sleep(0.2)
return [
{"id": "P002", "name": "配套商品"},
{"id": "P003", "name": "热门商品"},
]
# 完整测试
print("\n=== 商品详情缓存系统测试 ===")
product_cache = ProductDetailCache()
# 第一次请求
print("\n--- 第一次请求 ---")
start = time.time()
detail = product_cache.build_product_detail("P001")
print(f"详情页加载完成,耗时: {time.time()-start:.2f}秒")
print(f"商品名称: {detail['basic_info']['name']}")
print(f"可用库存: {detail['inventory']['available']}")
# 第二次请求(全缓存命中)
print("\n--- 第二次请求 ---")
start = time.time()
detail = product_cache.build_product_detail("P001")
print(f"详情页加载完成,耗时: {time.time()-start:.2f}秒")
# 等待库存缓存过期
print("\n--- 等待35秒(库存缓存过期) ---")
time.sleep(35)
start = time.time()
detail = product_cache.build_product_detail("P001")
print(f"详情页加载完成,耗时: {time.time()-start:.2f}秒")
print(f"更新后的库存: {detail['inventory']['available']}")
- TTL(Time To Live):设置缓存过期时间,避免使用过时数据
- 主动过期:在访问时检查是否过期,过期则删除
- 被动过期:后台线程定期清理过期数据
- 缓存击穿:高并发下同一缓存失效导致大量查询数据库
- 缓存雪崩:大量缓存同时过期导致数据库压力暴增
- 缓存穿透:查询不存在的数据,每次都会查数据库
这个实战案例涵盖了Python中缓存过期的各种场景和解决方案,适合直接应用到实际项目中。
标签: 缓存过期