Python缓存清理案例怎么编写?

wen python案例 3

本文目录导读:

  1. 使用 functools.lru_cache 清理缓存
  2. 手动实现简单缓存系统
  3. 使用 cachetools 库(需要安装)
  4. 文件缓存清理
  5. Redis缓存清理(需要安装redis)
  6. 装饰器实现缓存清理功能

我来给你编写几个Python缓存清理的实用案例:

使用 functools.lru_cache 清理缓存

from functools import lru_cache
import time
@lru_cache(maxsize=128)
def expensive_function(n):
    """模拟一个耗时计算函数"""
    print(f"计算 {n}...")
    time.sleep(1)
    return n * n
# 测试缓存
print("第一次调用:")
print(expensive_function(5))  # 计算
print("\n第二次调用(使用缓存):")
print(expensive_function(5))  # 直接从缓存返回
# 清理所有缓存
expensive_function.cache_clear()
print("\n缓存已清理")
print("\n第三次调用(重新计算):")
print(expensive_function(5))  # 重新计算
# 查看缓存信息
print(f"\n缓存信息: {expensive_function.cache_info()}")

手动实现简单缓存系统

import time
from datetime import datetime, timedelta
class SimpleCache:
    def __init__(self, timeout=60):
        self.cache = {}
        self.timeout = timeout  # 缓存过期时间(秒)
    def get(self, key):
        """获取缓存值"""
        if key in self.cache:
            value, timestamp = self.cache[key]
            # 检查是否过期
            if datetime.now() - timestamp < timedelta(seconds=self.timeout):
                print(f"命中缓存: {key}")
                return value
            else:
                print(f"缓存过期: {key}")
                self.delete(key)
        return None
    def set(self, key, value):
        """设置缓存"""
        self.cache[key] = (value, datetime.now())
        print(f"设置缓存: {key} = {value}")
    def delete(self, key):
        """删除单个缓存"""
        if key in self.cache:
            del self.cache[key]
            print(f"删除缓存: {key}")
    def clear(self):
        """清理所有缓存"""
        self.cache.clear()
        print("所有缓存已清理")
    def clean_expired(self):
        """清理过期缓存"""
        now = datetime.now()
        expired_keys = []
        for key, (value, timestamp) in self.cache.items():
            if now - timestamp > timedelta(seconds=self.timeout):
                expired_keys.append(key)
        for key in expired_keys:
            del self.cache[key]
        print(f"清理了 {len(expired_keys)} 个过期缓存")
# 使用示例
cache = SimpleCache(timeout=5)
# 设置缓存
cache.set("user_1", {"name": "Alice", "age": 30})
cache.set("user_2", {"name": "Bob", "age": 25})
# 获取缓存
print(cache.get("user_1"))  # 命中
print(cache.get("user_3"))  # 未命中
# 等缓存过期
print("\n等待6秒...")
time.sleep(6)
print(cache.get("user_1"))  # 过期,返回None
# 手动清理
cache.clear()

使用 cachetools 库(需要安装)

# 首先安装:pip install cachetools
from cachetools import cached, TTLCache, LRUCache
import time
# 基于时间的缓存(TTL)
@cached(cache=TTLCache(maxsize=100, ttl=5))
def get_user_info(user_id):
    """模拟获取用户信息"""
    print(f"从数据库获取用户 {user_id} 的信息...")
    time.sleep(1)
    return {
        "id": user_id,
        "name": f"User_{user_id}",
        "timestamp": time.time()
    }
# 基于LRU的缓存
lru_cache = LRUCache(maxsize=3)
@cached(cache=lru_cache)
def expensive_operation(x, y):
    print(f"执行耗时操作: {x} + {y}")
    time.sleep(0.5)
    return x + y
# 测试
print("=== TTL缓存测试 ===")
for i in range(3):
    print(get_user_info(1))  # 第一次会重新计算
print("\n等待6秒...")
time.sleep(6)
print(get_user_info(1))  # 缓存过期,重新计算
print("\n=== LRU缓存测试 ===")
for i in range(4):
    print(f"expensive_operation({i}, {i+1}): {expensive_operation(i, i+1)}")

文件缓存清理

import os
import json
import pickle
import time
from pathlib import Path
class FileCache:
    def __init__(self, cache_dir="cache"):
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
    def _get_cache_path(self, key):
        """获取缓存文件路径"""
        safe_key = str(hash(key))
        return self.cache_dir / f"{safe_key}.cache"
    def set(self, key, value, expiry=3600):
        """设置缓存到文件"""
        cache_path = self._get_cache_path(key)
        cache_data = {
            "value": value,
            "expiry": time.time() + expiry
        }
        with open(cache_path, 'wb') as f:
            pickle.dump(cache_data, f)
        print(f"缓存保存: {key}")
    def get(self, key):
        """从文件读取缓存"""
        cache_path = self._get_cache_path(key)
        if not cache_path.exists():
            return None
        try:
            with open(cache_path, 'rb') as f:
                cache_data = pickle.load(f)
            # 检查是否过期
            if time.time() > cache_data["expiry"]:
                print(f"缓存过期: {key}")
                self.delete(key)
                return None
            print(f"命中缓存: {key}")
            return cache_data["value"]
        except:
            return None
    def delete(self, key):
        """删除单个缓存文件"""
        cache_path = self._get_cache_path(key)
        if cache_path.exists():
            cache_path.unlink()
            print(f"删除缓存: {key}")
    def clear(self):
        """清理所有缓存文件"""
        for cache_file in self.cache_dir.glob("*.cache"):
            cache_file.unlink()
        print(f"清理了所有缓存文件")
    def clean_expired(self):
        """清理过期缓存文件"""
        count = 0
        for cache_file in self.cache_dir.glob("*.cache"):
            try:
                with open(cache_file, 'rb') as f:
                    cache_data = pickle.load(f)
                if time.time() > cache_data["expiry"]:
                    cache_file.unlink()
                    count += 1
            except:
                cache_file.unlink()
                count += 1
        print(f"清理了 {count} 个过期缓存文件")
# 使用示例
cache = FileCache()
# 设置缓存(5秒过期)
cache.set("user_data", {"name": "Alice", "score": 95}, expiry=5)
print(cache.get("user_data"))  # 命中
time.sleep(6)
print(cache.get("user_data"))  # 过期
# 清理所有缓存
cache.clear()

Redis缓存清理(需要安装redis)

# 安装:pip install redis
import redis
import json
class RedisCache:
    def __init__(self, host='localhost', port=6379, db=0):
        self.client = redis.Redis(host=host, port=port, db=db, decode_responses=True)
    def set(self, key, value, expiry=3600):
        """设置缓存"""
        self.client.setex(key, expiry, json.dumps(value))
        print(f"Redis缓存设置: {key}")
    def get(self, key):
        """获取缓存"""
        data = self.client.get(key)
        if data:
            print(f"命中Redis缓存: {key}")
            return json.loads(data)
        return None
    def delete(self, key):
        """删除单个缓存"""
        result = self.client.delete(key)
        if result:
            print(f"删除Redis缓存: {key}")
    def clear_pattern(self, pattern="*"):
        """按模式清理缓存"""
        cursor = 0
        count = 0
        while True:
            cursor, keys = self.client.scan(cursor=cursor, match=pattern, count=100)
            if keys:
                self.client.delete(*keys)
                count += len(keys)
            if cursor == 0:
                break
        print(f"清理了 {count} 个Redis缓存")
    def clear_all(self):
        """清理所有缓存"""
        self.client.flushdb()
        print("清理所有Redis缓存")
# 使用示例(需要先启动Redis服务)
try:
    cache = RedisCache()
    cache.set("user:1", {"name": "Alice", "age": 30})
    print(cache.get("user:1"))
    # 清理特定模式的缓存
    cache.clear_pattern("user:*")
except redis.ConnectionError:
    print("无法连接到Redis服务器,请确保Redis服务已启动")

装饰器实现缓存清理功能

import functools
import time
from collections import OrderedDict
def clear_cache_decorator(max_size=100, ttl=60):
    """缓存装饰器,支持自动清理"""
    cache = OrderedDict()
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 生成缓存键
            key = str(args) + str(sorted(kwargs.items()))
            # 检查缓存是否存在且未过期
            if key in cache:
                result, timestamp = cache[key]
                if time.time() - timestamp < ttl:
                    print(f"命中缓存: {func.__name__}{args}")
                    return result
                else:
                    print(f"缓存过期: {func.__name__}{args}")
                    del cache[key]
            # 计算新结果
            result = func(*args, **kwargs)
            # 缓存结果
            cache[key] = (result, time.time())
            # 如果缓存超出最大大小,删除最旧的
            if len(cache) > max_size:
                cache.popitem(last=False)
                print("缓存超出大小,清理最旧缓存")
            return result
        # 添加清理方法
        def clear_cache():
            cache.clear()
            print(f"清理 {func.__name__} 的所有缓存")
        wrapper.clear_cache = clear_cache
        return wrapper
    return decorator
@clear_cache_decorator(max_size=3, ttl=3)
def compute_square(n):
    """计算平方数"""
    print(f"计算 {n} 的平方...")
    time.sleep(0.5)
    return n * n
# 测试
print("=== 缓存装饰器测试 ===")
for i in range(4):
    print(f"compute_square({i}): {compute_square(i)}")
print("\n重复调用(使用缓存):")
print(compute_square(1))
print(compute_square(2))
print("\n等待4秒...")
time.sleep(4)
print(compute_square(1))  # 缓存过期
print("\n手动清理缓存:")
compute_square.clear_cache()
print(compute_square(1))  # 重新计算

这些案例展示了不同场景下的缓存清理方案:

  1. lru_cache: 适合函数级别的简单缓存
  2. 手动实现: 灵活控制,适合学习原理
  3. cachetools: 专业缓存库,功能丰富
  4. 文件缓存: 持久化存储,适合重启后使用
  5. Redis缓存: 分布式场景,适合生产环境
  6. 装饰器: 优雅的实现方式,易于复用

选择哪种方案取决于你的具体需求:

  • 简单场景:用 lru_cache
  • 需要过期机制:用 cachetools 或手动实现
  • 分布式系统:用 Redis
  • 需要持久化:用文件缓存

标签: Python案例

抱歉,评论功能暂时关闭!