224 lines
6.7 KiB
Python
224 lines
6.7 KiB
Python
"""
|
||
Redis 缓存工具
|
||
"""
|
||
import json
|
||
import hashlib
|
||
from typing import Optional, Any
|
||
from app.core.config import settings
|
||
from app.utils.logger import logger
|
||
|
||
try:
|
||
import redis
|
||
REDIS_AVAILABLE = True
|
||
except ImportError:
|
||
REDIS_AVAILABLE = False
|
||
logger.warning("Redis 未安装,缓存功能将不可用")
|
||
|
||
|
||
class CacheManager:
|
||
"""Redis 缓存管理器"""
|
||
|
||
def __init__(self):
|
||
"""初始化缓存管理器"""
|
||
self._redis = None
|
||
|
||
if REDIS_AVAILABLE and settings.ENABLE_CACHE:
|
||
try:
|
||
self._redis = redis.Redis(
|
||
host=settings.REDIS_HOST or 'localhost',
|
||
port=settings.REDIS_PORT or 6379,
|
||
db=settings.REDIS_DB or 0,
|
||
password=settings.REDIS_PASSWORD,
|
||
decode_responses=True
|
||
)
|
||
logger.info(
|
||
f"Redis 缓存已启用 - {settings.REDIS_HOST}:{settings.REDIS_PORT}"
|
||
)
|
||
except Exception as e:
|
||
logger.error(f"Redis 连接失败: {str(e)}")
|
||
self._redis = None
|
||
else:
|
||
logger.info("Redis 缓存未启用")
|
||
|
||
def is_available(self) -> bool:
|
||
"""检查缓存是否可用"""
|
||
return self._redis is not None
|
||
|
||
def _generate_key(self, prefix: str, *args) -> str:
|
||
"""生成缓存键"""
|
||
key_parts = [settings.CACHE_PREFIX, prefix]
|
||
key_parts.extend(str(arg) for arg in args if arg is not None)
|
||
return ":".join(key_parts)
|
||
|
||
def _serialize(self, data: Any) -> str:
|
||
"""序列化数据"""
|
||
return json.dumps(data, ensure_ascii=False)
|
||
|
||
def _deserialize(self, data: str) -> Any:
|
||
"""反序列化数据"""
|
||
try:
|
||
return json.loads(data)
|
||
except json.JSONDecodeError:
|
||
return None
|
||
|
||
async def get(self, prefix: str, *args, default: Any = None) -> Optional[Any]:
|
||
"""
|
||
获取缓存数据
|
||
|
||
Args:
|
||
prefix: 缓存前缀
|
||
*args: 键的其他部分
|
||
default: 默认值(缓存不存在时返回)
|
||
|
||
Returns:
|
||
缓存的数据,如果缓存不存在或不可用则返回默认值
|
||
"""
|
||
if not self.is_available():
|
||
return default
|
||
|
||
try:
|
||
key = self._generate_key(prefix, *args)
|
||
data = self._redis.get(key)
|
||
|
||
if data is not None:
|
||
return self._deserialize(data)
|
||
|
||
logger.debug(f"缓存未命中: {key}")
|
||
return default
|
||
|
||
except Exception as e:
|
||
logger.error(f"Redis 获取失败: {str(e)}")
|
||
return default
|
||
|
||
async def set(self, prefix: str, *args, data: Any, ttl: Optional[int] = None) -> bool:
|
||
"""
|
||
设置缓存数据
|
||
|
||
Args:
|
||
prefix: 缓存前缀
|
||
*args: 键的其他部分
|
||
data: 要缓存的数据
|
||
ttl: 过期时间(秒),不传则使用默认值
|
||
|
||
Returns:
|
||
是否设置成功
|
||
"""
|
||
if not self.is_available():
|
||
logger.warning("Redis 不可用,缓存设置失败")
|
||
return False
|
||
|
||
try:
|
||
key = self._generate_key(prefix, *args)
|
||
serialized_data = self._serialize(data)
|
||
|
||
if ttl is None:
|
||
ttl = settings.CACHE_TTL
|
||
|
||
self._redis.setex(key, ttl, serialized_data)
|
||
logger.debug(f"缓存已设置: {key}, TTL: {ttl}秒")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Redis 设置失败: {str(e)}")
|
||
return False
|
||
|
||
async def delete(self, prefix: str, *args) -> bool:
|
||
"""
|
||
删除缓存数据
|
||
|
||
Args:
|
||
prefix: 缓存前缀
|
||
*args: 键的其他部分
|
||
|
||
Returns:
|
||
是否删除成功
|
||
"""
|
||
if not self.is_available():
|
||
logger.warning("Redis 不可用,缓存删除失败")
|
||
return False
|
||
|
||
try:
|
||
key = self._generate_key(prefix, *args)
|
||
self._redis.delete(key)
|
||
logger.debug(f"缓存已删除: {key}")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Redis 删除失败: {str(e)}")
|
||
return False
|
||
|
||
async def get_llm_response(self, prompt: str, model: str, temperature: float) -> Optional[str]:
|
||
"""
|
||
获取 LLM 响应缓存
|
||
|
||
Args:
|
||
prompt: 提示词
|
||
model: 模型名称
|
||
temperature: 温度参数
|
||
|
||
Returns:
|
||
缓存的响应,如果不存在则返回 None
|
||
"""
|
||
if not self.is_available():
|
||
return None
|
||
|
||
try:
|
||
# 生成唯一的缓存键(基于提示词的哈希)
|
||
prompt_hash = hashlib.md5(prompt.encode()).hexdigest()[:16]
|
||
key = self._generate_key("llm", model, str(temperature), prompt_hash)
|
||
|
||
cached = await self.get("llm", model, str(temperature), prompt_hash)
|
||
|
||
if cached:
|
||
logger.info(f"LLM 响应缓存命中: {key}")
|
||
return cached
|
||
|
||
return None
|
||
|
||
except Exception as e:
|
||
logger.error(f"LLM 缓存获取失败: {str(e)}")
|
||
return None
|
||
|
||
async def set_llm_response(self, prompt: str, model: str, temperature: float, response: str) -> bool:
|
||
"""
|
||
设置 LLM 响应缓存
|
||
|
||
Args:
|
||
prompt: 提示词
|
||
model: 模型名称
|
||
temperature: 温度参数
|
||
response: LLM 响应
|
||
|
||
Returns:
|
||
是否设置成功
|
||
"""
|
||
if not self.is_available():
|
||
logger.warning("Redis 不可用,LLM 缓存设置失败")
|
||
return False
|
||
|
||
try:
|
||
# 生成唯一的缓存键
|
||
prompt_hash = hashlib.md5(prompt.encode()).hexdigest()[:16]
|
||
key = self._generate_key("llm", model, str(temperature), prompt_hash)
|
||
|
||
# 设置缓存,TTL 为 1 小时
|
||
success = await self.set("llm", model, str(temperature), prompt_hash, response, ttl=3600)
|
||
|
||
if success:
|
||
logger.info(f"LLM 响应已缓存: {key}")
|
||
|
||
return success
|
||
|
||
except Exception as e:
|
||
logger.error(f"LLM 缓存设置失败: {str(e)}")
|
||
return False
|
||
|
||
|
||
# 全局缓存管理器实例
|
||
cache_manager = CacheManager()
|
||
|
||
|
||
def get_cache_manager() -> CacheManager:
|
||
"""获取缓存管理器实例"""
|
||
return cache_manager
|