""" API 调用监控和告警工具 """ import time from typing import Optional, Dict, Any, List from collections import defaultdict from datetime import datetime, timedelta from app.core.config import settings from app.utils.logger import logger class APIMonitor: """API 调用监控器""" def __init__(self): """初始化监控器""" self.api_calls: Dict[str, List[Dict[str, Any]]] = defaultdict(list) self.alert_cooldown: Dict[str, datetime] = {} def record_call( self, endpoint: str, method: str, status_code: int, response_time: float, error: Optional[str] = None ) -> None: """ 记录 API 调用 Args: endpoint: API 端点 method: HTTP 方法 status_code: 状态码 response_time: 响应时间(毫秒) error: 错误信息(如果有) """ call_record = { "endpoint": endpoint, "method": method, "status_code": status_code, "response_time": response_time, "error": error, "timestamp": datetime.now() } # 记录调用 self.api_calls[endpoint].append(call_record) # 检查告警条件 self._check_alerts(endpoint, call_record) # 清理旧数据(保留最近 1 小时) self._cleanup_old_records() def _check_alerts(self, endpoint: str, call_record: Dict[str, Any]) -> None: """ 检查是否需要发送告警 Args: endpoint: API 端点 call_record: 调用记录 """ if settings.ALERT_TYPE == "none": return # 检查冷却时间 if endpoint in self.alert_cooldown: if datetime.now() - self.alert_cooldown[endpoint] < timedelta(seconds=settings.ALERT_COOLDOWN): return # 检查错误率 error_rate = self._calculate_error_rate(endpoint) if error_rate > settings.ERROR_RATE_THRESHOLD: self._send_alert( alert_type="error_rate", endpoint=endpoint, message=f"API 错误率过高: {error_rate:.2%} (阈值: {settings.ERROR_RATE_THRESHOLD:.2%})", details={ "error_rate": error_rate, "threshold": settings.ERROR_RATE_THRESHOLD, "recent_calls": self.api_calls[endpoint][-10:] # 最近 10 次调用 } ) self.alert_cooldown[endpoint] = datetime.now() return # 检查响应时间 if call_record["response_time"] > settings.RESPONSE_TIME_THRESHOLD: self._send_alert( alert_type="response_time", endpoint=endpoint, message=f"API 响应时间过长: {call_record['response_time']:.0f}ms (阈值: {settings.RESPONSE_TIME_THRESHOLD}ms)", details={ "response_time": call_record["response_time"], "threshold": settings.RESPONSE_TIME_THRESHOLD, "endpoint": endpoint, "method": call_record["method"] } ) self.alert_cooldown[endpoint] = datetime.now() return # 检查错误状态码 if call_record["status_code"] >= 500: self._send_alert( alert_type="server_error", endpoint=endpoint, message=f"API 服务器错误: {call_record['status_code']}", details={ "status_code": call_record["status_code"], "error": call_record.get("error"), "endpoint": endpoint, "method": call_record["method"] } ) self.alert_cooldown[endpoint] = datetime.now() def _calculate_error_rate(self, endpoint: str) -> float: """ 计算错误率 Args: endpoint: API 端点 Returns: 错误率(0-1) """ calls = self.api_calls.get(endpoint, []) if not calls: return 0.0 # 计算最近 100 次调用的错误率 recent_calls = calls[-100:] error_count = sum(1 for call in recent_calls if call["status_code"] >= 400) return error_count / len(recent_calls) def _cleanup_old_records(self) -> None: """清理旧数据(保留最近 1 小时)""" cutoff_time = datetime.now() - timedelta(hours=1) for endpoint in list(self.api_calls.keys()): self.api_calls[endpoint] = [ call for call in self.api_calls[endpoint] if call["timestamp"] > cutoff_time ] # 如果没有数据了,删除该端点 if not self.api_calls[endpoint]: del self.api_calls[endpoint] def _send_alert( self, alert_type: str, endpoint: str, message: str, details: Dict[str, Any] ) -> None: """ 发送告警 Args: alert_type: 告警类型 endpoint: API 端点 message: 告警消息 details: 告警详情 """ logger.warning(f"告警触发: {message}") if settings.ALERT_TYPE == "email": self._send_email_alert(alert_type, endpoint, message, details) elif settings.ALERT_TYPE == "webhook": self._send_webhook_alert(alert_type, endpoint, message, details) def _send_email_alert( self, alert_type: str, endpoint: str, message: str, details: Dict[str, Any] ) -> None: """ 发送邮件告警 Args: alert_type: 告警类型 endpoint: API 端点 message: 告警消息 details: 告警详情 """ try: import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart if not all([settings.SMTP_HOST, settings.SMTP_USERNAME, settings.ALERT_FROM_EMAIL, settings.ALERT_TO_EMAIL]): logger.warning("邮件告警配置不完整,无法发送邮件") return # 创建邮件 msg = MIMEMultipart() msg['From'] = settings.ALERT_FROM_EMAIL msg['To'] = settings.ALERT_TO_EMAIL msg['Subject'] = f"[{settings.APP_NAME}] 告警: {alert_type}" # 邮件正文 body = f"""
告警类型: {alert_type}
API 端点: {endpoint}
告警消息: {message}
告警时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
{details}
"""
msg.attach(MIMEText(body, 'html'))
# 发送邮件
with smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT) as server:
server.starttls()
if settings.SMTP_PASSWORD:
server.login(settings.SMTP_USERNAME, settings.SMTP_PASSWORD)
server.send_message(msg)
logger.info(f"邮件告警发送成功: {alert_type}")
except Exception as e:
logger.error(f"发送邮件告警失败: {str(e)}")
def _send_webhook_alert(
self,
alert_type: str,
endpoint: str,
message: str,
details: Dict[str, Any]
) -> None:
"""
发送 Webhook 告警
Args:
alert_type: 告警类型
endpoint: API 端点
message: 告警消息
details: 告警详情
"""
try:
import httpx
if not settings.ALERT_WEBHOOK_URL:
logger.warning("Webhook URL 未配置,无法发送告警")
return
# 构造告警数据
alert_data = {
"alert_type": alert_type,
"endpoint": endpoint,
"message": message,
"timestamp": datetime.now().isoformat(),
"app_name": settings.APP_NAME,
"details": details
}
# 发送 Webhook
with httpx.Client(timeout=10) as client:
response = client.post(
settings.ALERT_WEBHOOK_URL,
json=alert_data
)
response.raise_for_status()
logger.info(f"Webhook 告警发送成功: {alert_type}")
except Exception as e:
logger.error(f"发送 Webhook 告警失败: {str(e)}")
def get_stats(self, endpoint: Optional[str] = None) -> Dict[str, Any]:
"""
获取统计信息
Args:
endpoint: API 端点(可选,不指定则返回所有端点的统计)
Returns:
统计信息
"""
if endpoint:
calls = self.api_calls.get(endpoint, [])
else:
calls = [call for calls in self.api_calls.values() for call in calls]
if not calls:
return {
"total_calls": 0,
"error_rate": 0.0,
"avg_response_time": 0.0,
"max_response_time": 0.0,
"min_response_time": 0.0
}
response_times = [call["response_time"] for call in calls]
error_count = sum(1 for call in calls if call["status_code"] >= 400)
return {
"total_calls": len(calls),
"error_rate": error_count / len(calls),
"avg_response_time": sum(response_times) / len(response_times),
"max_response_time": max(response_times),
"min_response_time": min(response_times)
}
# 全局监控器实例
api_monitor = APIMonitor()
class APICallTimer:
"""API 调用计时器上下文管理器"""
def __init__(self, endpoint: str, method: str):
"""
初始化计时器
Args:
endpoint: API 端点
method: HTTP 方法
"""
self.endpoint = endpoint
self.method = method
self.start_time = None
self.status_code = None
self.error = None
def __enter__(self):
"""进入上下文"""
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文"""
if self.start_time is None:
return
response_time = (time.time() - self.start_time) * 1000 # 转换为毫秒
if exc_type is not None:
# 发生了异常
self.status_code = 500
self.error = str(exc_val)
elif self.status_code is None:
# 没有设置状态码,默认为 200
self.status_code = 200
# 记录调用
api_monitor.record_call(
endpoint=self.endpoint,
method=self.method,
status_code=self.status_code,
response_time=response_time,
error=self.error
)
def set_status_code(self, status_code: int) -> None:
"""
设置状态码
Args:
status_code: HTTP 状态码
"""
self.status_code = status_code