2026-01-11 07:48:19 +08:00

373 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
API 调用监控和告警工具
"""
import time
from typing import Optional, Dict, Any, List
from collections import defaultdict
from datetime import datetime, timedelta
from app.core.config import settings
from app.utils.logger import logger
class APIMonitor:
"""API 调用监控器"""
def __init__(self):
"""初始化监控器"""
self.api_calls: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
self.alert_cooldown: Dict[str, datetime] = {}
def record_call(
self,
endpoint: str,
method: str,
status_code: int,
response_time: float,
error: Optional[str] = None
) -> None:
"""
记录 API 调用
Args:
endpoint: API 端点
method: HTTP 方法
status_code: 状态码
response_time: 响应时间(毫秒)
error: 错误信息(如果有)
"""
call_record = {
"endpoint": endpoint,
"method": method,
"status_code": status_code,
"response_time": response_time,
"error": error,
"timestamp": datetime.now()
}
# 记录调用
self.api_calls[endpoint].append(call_record)
# 检查告警条件
self._check_alerts(endpoint, call_record)
# 清理旧数据(保留最近 1 小时)
self._cleanup_old_records()
def _check_alerts(self, endpoint: str, call_record: Dict[str, Any]) -> None:
"""
检查是否需要发送告警
Args:
endpoint: API 端点
call_record: 调用记录
"""
if settings.ALERT_TYPE == "none":
return
# 检查冷却时间
if endpoint in self.alert_cooldown:
if datetime.now() - self.alert_cooldown[endpoint] < timedelta(seconds=settings.ALERT_COOLDOWN):
return
# 检查错误率
error_rate = self._calculate_error_rate(endpoint)
if error_rate > settings.ERROR_RATE_THRESHOLD:
self._send_alert(
alert_type="error_rate",
endpoint=endpoint,
message=f"API 错误率过高: {error_rate:.2%} (阈值: {settings.ERROR_RATE_THRESHOLD:.2%})",
details={
"error_rate": error_rate,
"threshold": settings.ERROR_RATE_THRESHOLD,
"recent_calls": self.api_calls[endpoint][-10:] # 最近 10 次调用
}
)
self.alert_cooldown[endpoint] = datetime.now()
return
# 检查响应时间
if call_record["response_time"] > settings.RESPONSE_TIME_THRESHOLD:
self._send_alert(
alert_type="response_time",
endpoint=endpoint,
message=f"API 响应时间过长: {call_record['response_time']:.0f}ms (阈值: {settings.RESPONSE_TIME_THRESHOLD}ms)",
details={
"response_time": call_record["response_time"],
"threshold": settings.RESPONSE_TIME_THRESHOLD,
"endpoint": endpoint,
"method": call_record["method"]
}
)
self.alert_cooldown[endpoint] = datetime.now()
return
# 检查错误状态码
if call_record["status_code"] >= 500:
self._send_alert(
alert_type="server_error",
endpoint=endpoint,
message=f"API 服务器错误: {call_record['status_code']}",
details={
"status_code": call_record["status_code"],
"error": call_record.get("error"),
"endpoint": endpoint,
"method": call_record["method"]
}
)
self.alert_cooldown[endpoint] = datetime.now()
def _calculate_error_rate(self, endpoint: str) -> float:
"""
计算错误率
Args:
endpoint: API 端点
Returns:
错误率0-1
"""
calls = self.api_calls.get(endpoint, [])
if not calls:
return 0.0
# 计算最近 100 次调用的错误率
recent_calls = calls[-100:]
error_count = sum(1 for call in recent_calls if call["status_code"] >= 400)
return error_count / len(recent_calls)
def _cleanup_old_records(self) -> None:
"""清理旧数据(保留最近 1 小时)"""
cutoff_time = datetime.now() - timedelta(hours=1)
for endpoint in list(self.api_calls.keys()):
self.api_calls[endpoint] = [
call for call in self.api_calls[endpoint]
if call["timestamp"] > cutoff_time
]
# 如果没有数据了,删除该端点
if not self.api_calls[endpoint]:
del self.api_calls[endpoint]
def _send_alert(
self,
alert_type: str,
endpoint: str,
message: str,
details: Dict[str, Any]
) -> None:
"""
发送告警
Args:
alert_type: 告警类型
endpoint: API 端点
message: 告警消息
details: 告警详情
"""
logger.warning(f"告警触发: {message}")
if settings.ALERT_TYPE == "email":
self._send_email_alert(alert_type, endpoint, message, details)
elif settings.ALERT_TYPE == "webhook":
self._send_webhook_alert(alert_type, endpoint, message, details)
def _send_email_alert(
self,
alert_type: str,
endpoint: str,
message: str,
details: Dict[str, Any]
) -> None:
"""
发送邮件告警
Args:
alert_type: 告警类型
endpoint: API 端点
message: 告警消息
details: 告警详情
"""
try:
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
if not all([settings.SMTP_HOST, settings.SMTP_USERNAME,
settings.ALERT_FROM_EMAIL, settings.ALERT_TO_EMAIL]):
logger.warning("邮件告警配置不完整,无法发送邮件")
return
# 创建邮件
msg = MIMEMultipart()
msg['From'] = settings.ALERT_FROM_EMAIL
msg['To'] = settings.ALERT_TO_EMAIL
msg['Subject'] = f"[{settings.APP_NAME}] 告警: {alert_type}"
# 邮件正文
body = f"""
<html>
<body>
<h2>API 告警通知</h2>
<p><strong>告警类型:</strong> {alert_type}</p>
<p><strong>API 端点:</strong> {endpoint}</p>
<p><strong>告警消息:</strong> {message}</p>
<p><strong>告警时间:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
<h3>详细信息:</h3>
<pre>{details}</pre>
</body>
</html>
"""
msg.attach(MIMEText(body, 'html'))
# 发送邮件
with smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT) as server:
server.starttls()
if settings.SMTP_PASSWORD:
server.login(settings.SMTP_USERNAME, settings.SMTP_PASSWORD)
server.send_message(msg)
logger.info(f"邮件告警发送成功: {alert_type}")
except Exception as e:
logger.error(f"发送邮件告警失败: {str(e)}")
def _send_webhook_alert(
self,
alert_type: str,
endpoint: str,
message: str,
details: Dict[str, Any]
) -> None:
"""
发送 Webhook 告警
Args:
alert_type: 告警类型
endpoint: API 端点
message: 告警消息
details: 告警详情
"""
try:
import httpx
if not settings.ALERT_WEBHOOK_URL:
logger.warning("Webhook URL 未配置,无法发送告警")
return
# 构造告警数据
alert_data = {
"alert_type": alert_type,
"endpoint": endpoint,
"message": message,
"timestamp": datetime.now().isoformat(),
"app_name": settings.APP_NAME,
"details": details
}
# 发送 Webhook
with httpx.Client(timeout=10) as client:
response = client.post(
settings.ALERT_WEBHOOK_URL,
json=alert_data
)
response.raise_for_status()
logger.info(f"Webhook 告警发送成功: {alert_type}")
except Exception as e:
logger.error(f"发送 Webhook 告警失败: {str(e)}")
def get_stats(self, endpoint: Optional[str] = None) -> Dict[str, Any]:
"""
获取统计信息
Args:
endpoint: API 端点(可选,不指定则返回所有端点的统计)
Returns:
统计信息
"""
if endpoint:
calls = self.api_calls.get(endpoint, [])
else:
calls = [call for calls in self.api_calls.values() for call in calls]
if not calls:
return {
"total_calls": 0,
"error_rate": 0.0,
"avg_response_time": 0.0,
"max_response_time": 0.0,
"min_response_time": 0.0
}
response_times = [call["response_time"] for call in calls]
error_count = sum(1 for call in calls if call["status_code"] >= 400)
return {
"total_calls": len(calls),
"error_rate": error_count / len(calls),
"avg_response_time": sum(response_times) / len(response_times),
"max_response_time": max(response_times),
"min_response_time": min(response_times)
}
# 全局监控器实例
api_monitor = APIMonitor()
class APICallTimer:
"""API 调用计时器上下文管理器"""
def __init__(self, endpoint: str, method: str):
"""
初始化计时器
Args:
endpoint: API 端点
method: HTTP 方法
"""
self.endpoint = endpoint
self.method = method
self.start_time = None
self.status_code = None
self.error = None
def __enter__(self):
"""进入上下文"""
self.start_time = time.time()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文"""
if self.start_time is None:
return
response_time = (time.time() - self.start_time) * 1000 # 转换为毫秒
if exc_type is not None:
# 发生了异常
self.status_code = 500
self.error = str(exc_val)
elif self.status_code is None:
# 没有设置状态码,默认为 200
self.status_code = 200
# 记录调用
api_monitor.record_call(
endpoint=self.endpoint,
method=self.method,
status_code=self.status_code,
response_time=response_time,
error=self.error
)
def set_status_code(self, status_code: int) -> None:
"""
设置状态码
Args:
status_code: HTTP 状态码
"""
self.status_code = status_code