373 lines
12 KiB
Python
373 lines
12 KiB
Python
"""
|
||
API 调用监控和告警工具
|
||
"""
|
||
import time
|
||
from typing import Optional, Dict, Any, List
|
||
from collections import defaultdict
|
||
from datetime import datetime, timedelta
|
||
from app.core.config import settings
|
||
from app.utils.logger import logger
|
||
|
||
|
||
class APIMonitor:
|
||
"""API 调用监控器"""
|
||
|
||
def __init__(self):
|
||
"""初始化监控器"""
|
||
self.api_calls: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
|
||
self.alert_cooldown: Dict[str, datetime] = {}
|
||
|
||
def record_call(
|
||
self,
|
||
endpoint: str,
|
||
method: str,
|
||
status_code: int,
|
||
response_time: float,
|
||
error: Optional[str] = None
|
||
) -> None:
|
||
"""
|
||
记录 API 调用
|
||
|
||
Args:
|
||
endpoint: API 端点
|
||
method: HTTP 方法
|
||
status_code: 状态码
|
||
response_time: 响应时间(毫秒)
|
||
error: 错误信息(如果有)
|
||
"""
|
||
call_record = {
|
||
"endpoint": endpoint,
|
||
"method": method,
|
||
"status_code": status_code,
|
||
"response_time": response_time,
|
||
"error": error,
|
||
"timestamp": datetime.now()
|
||
}
|
||
|
||
# 记录调用
|
||
self.api_calls[endpoint].append(call_record)
|
||
|
||
# 检查告警条件
|
||
self._check_alerts(endpoint, call_record)
|
||
|
||
# 清理旧数据(保留最近 1 小时)
|
||
self._cleanup_old_records()
|
||
|
||
def _check_alerts(self, endpoint: str, call_record: Dict[str, Any]) -> None:
|
||
"""
|
||
检查是否需要发送告警
|
||
|
||
Args:
|
||
endpoint: API 端点
|
||
call_record: 调用记录
|
||
"""
|
||
if settings.ALERT_TYPE == "none":
|
||
return
|
||
|
||
# 检查冷却时间
|
||
if endpoint in self.alert_cooldown:
|
||
if datetime.now() - self.alert_cooldown[endpoint] < timedelta(seconds=settings.ALERT_COOLDOWN):
|
||
return
|
||
|
||
# 检查错误率
|
||
error_rate = self._calculate_error_rate(endpoint)
|
||
if error_rate > settings.ERROR_RATE_THRESHOLD:
|
||
self._send_alert(
|
||
alert_type="error_rate",
|
||
endpoint=endpoint,
|
||
message=f"API 错误率过高: {error_rate:.2%} (阈值: {settings.ERROR_RATE_THRESHOLD:.2%})",
|
||
details={
|
||
"error_rate": error_rate,
|
||
"threshold": settings.ERROR_RATE_THRESHOLD,
|
||
"recent_calls": self.api_calls[endpoint][-10:] # 最近 10 次调用
|
||
}
|
||
)
|
||
self.alert_cooldown[endpoint] = datetime.now()
|
||
return
|
||
|
||
# 检查响应时间
|
||
if call_record["response_time"] > settings.RESPONSE_TIME_THRESHOLD:
|
||
self._send_alert(
|
||
alert_type="response_time",
|
||
endpoint=endpoint,
|
||
message=f"API 响应时间过长: {call_record['response_time']:.0f}ms (阈值: {settings.RESPONSE_TIME_THRESHOLD}ms)",
|
||
details={
|
||
"response_time": call_record["response_time"],
|
||
"threshold": settings.RESPONSE_TIME_THRESHOLD,
|
||
"endpoint": endpoint,
|
||
"method": call_record["method"]
|
||
}
|
||
)
|
||
self.alert_cooldown[endpoint] = datetime.now()
|
||
return
|
||
|
||
# 检查错误状态码
|
||
if call_record["status_code"] >= 500:
|
||
self._send_alert(
|
||
alert_type="server_error",
|
||
endpoint=endpoint,
|
||
message=f"API 服务器错误: {call_record['status_code']}",
|
||
details={
|
||
"status_code": call_record["status_code"],
|
||
"error": call_record.get("error"),
|
||
"endpoint": endpoint,
|
||
"method": call_record["method"]
|
||
}
|
||
)
|
||
self.alert_cooldown[endpoint] = datetime.now()
|
||
|
||
def _calculate_error_rate(self, endpoint: str) -> float:
|
||
"""
|
||
计算错误率
|
||
|
||
Args:
|
||
endpoint: API 端点
|
||
|
||
Returns:
|
||
错误率(0-1)
|
||
"""
|
||
calls = self.api_calls.get(endpoint, [])
|
||
if not calls:
|
||
return 0.0
|
||
|
||
# 计算最近 100 次调用的错误率
|
||
recent_calls = calls[-100:]
|
||
error_count = sum(1 for call in recent_calls if call["status_code"] >= 400)
|
||
return error_count / len(recent_calls)
|
||
|
||
def _cleanup_old_records(self) -> None:
|
||
"""清理旧数据(保留最近 1 小时)"""
|
||
cutoff_time = datetime.now() - timedelta(hours=1)
|
||
for endpoint in list(self.api_calls.keys()):
|
||
self.api_calls[endpoint] = [
|
||
call for call in self.api_calls[endpoint]
|
||
if call["timestamp"] > cutoff_time
|
||
]
|
||
# 如果没有数据了,删除该端点
|
||
if not self.api_calls[endpoint]:
|
||
del self.api_calls[endpoint]
|
||
|
||
def _send_alert(
|
||
self,
|
||
alert_type: str,
|
||
endpoint: str,
|
||
message: str,
|
||
details: Dict[str, Any]
|
||
) -> None:
|
||
"""
|
||
发送告警
|
||
|
||
Args:
|
||
alert_type: 告警类型
|
||
endpoint: API 端点
|
||
message: 告警消息
|
||
details: 告警详情
|
||
"""
|
||
logger.warning(f"告警触发: {message}")
|
||
|
||
if settings.ALERT_TYPE == "email":
|
||
self._send_email_alert(alert_type, endpoint, message, details)
|
||
elif settings.ALERT_TYPE == "webhook":
|
||
self._send_webhook_alert(alert_type, endpoint, message, details)
|
||
|
||
def _send_email_alert(
|
||
self,
|
||
alert_type: str,
|
||
endpoint: str,
|
||
message: str,
|
||
details: Dict[str, Any]
|
||
) -> None:
|
||
"""
|
||
发送邮件告警
|
||
|
||
Args:
|
||
alert_type: 告警类型
|
||
endpoint: API 端点
|
||
message: 告警消息
|
||
details: 告警详情
|
||
"""
|
||
try:
|
||
import smtplib
|
||
from email.mime.text import MIMEText
|
||
from email.mime.multipart import MIMEMultipart
|
||
|
||
if not all([settings.SMTP_HOST, settings.SMTP_USERNAME,
|
||
settings.ALERT_FROM_EMAIL, settings.ALERT_TO_EMAIL]):
|
||
logger.warning("邮件告警配置不完整,无法发送邮件")
|
||
return
|
||
|
||
# 创建邮件
|
||
msg = MIMEMultipart()
|
||
msg['From'] = settings.ALERT_FROM_EMAIL
|
||
msg['To'] = settings.ALERT_TO_EMAIL
|
||
msg['Subject'] = f"[{settings.APP_NAME}] 告警: {alert_type}"
|
||
|
||
# 邮件正文
|
||
body = f"""
|
||
<html>
|
||
<body>
|
||
<h2>API 告警通知</h2>
|
||
<p><strong>告警类型:</strong> {alert_type}</p>
|
||
<p><strong>API 端点:</strong> {endpoint}</p>
|
||
<p><strong>告警消息:</strong> {message}</p>
|
||
<p><strong>告警时间:</strong> {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>
|
||
<h3>详细信息:</h3>
|
||
<pre>{details}</pre>
|
||
</body>
|
||
</html>
|
||
"""
|
||
msg.attach(MIMEText(body, 'html'))
|
||
|
||
# 发送邮件
|
||
with smtplib.SMTP(settings.SMTP_HOST, settings.SMTP_PORT) as server:
|
||
server.starttls()
|
||
if settings.SMTP_PASSWORD:
|
||
server.login(settings.SMTP_USERNAME, settings.SMTP_PASSWORD)
|
||
server.send_message(msg)
|
||
|
||
logger.info(f"邮件告警发送成功: {alert_type}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送邮件告警失败: {str(e)}")
|
||
|
||
def _send_webhook_alert(
|
||
self,
|
||
alert_type: str,
|
||
endpoint: str,
|
||
message: str,
|
||
details: Dict[str, Any]
|
||
) -> None:
|
||
"""
|
||
发送 Webhook 告警
|
||
|
||
Args:
|
||
alert_type: 告警类型
|
||
endpoint: API 端点
|
||
message: 告警消息
|
||
details: 告警详情
|
||
"""
|
||
try:
|
||
import httpx
|
||
|
||
if not settings.ALERT_WEBHOOK_URL:
|
||
logger.warning("Webhook URL 未配置,无法发送告警")
|
||
return
|
||
|
||
# 构造告警数据
|
||
alert_data = {
|
||
"alert_type": alert_type,
|
||
"endpoint": endpoint,
|
||
"message": message,
|
||
"timestamp": datetime.now().isoformat(),
|
||
"app_name": settings.APP_NAME,
|
||
"details": details
|
||
}
|
||
|
||
# 发送 Webhook
|
||
with httpx.Client(timeout=10) as client:
|
||
response = client.post(
|
||
settings.ALERT_WEBHOOK_URL,
|
||
json=alert_data
|
||
)
|
||
response.raise_for_status()
|
||
|
||
logger.info(f"Webhook 告警发送成功: {alert_type}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"发送 Webhook 告警失败: {str(e)}")
|
||
|
||
def get_stats(self, endpoint: Optional[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
获取统计信息
|
||
|
||
Args:
|
||
endpoint: API 端点(可选,不指定则返回所有端点的统计)
|
||
|
||
Returns:
|
||
统计信息
|
||
"""
|
||
if endpoint:
|
||
calls = self.api_calls.get(endpoint, [])
|
||
else:
|
||
calls = [call for calls in self.api_calls.values() for call in calls]
|
||
|
||
if not calls:
|
||
return {
|
||
"total_calls": 0,
|
||
"error_rate": 0.0,
|
||
"avg_response_time": 0.0,
|
||
"max_response_time": 0.0,
|
||
"min_response_time": 0.0
|
||
}
|
||
|
||
response_times = [call["response_time"] for call in calls]
|
||
error_count = sum(1 for call in calls if call["status_code"] >= 400)
|
||
|
||
return {
|
||
"total_calls": len(calls),
|
||
"error_rate": error_count / len(calls),
|
||
"avg_response_time": sum(response_times) / len(response_times),
|
||
"max_response_time": max(response_times),
|
||
"min_response_time": min(response_times)
|
||
}
|
||
|
||
|
||
# 全局监控器实例
|
||
api_monitor = APIMonitor()
|
||
|
||
|
||
class APICallTimer:
|
||
"""API 调用计时器上下文管理器"""
|
||
|
||
def __init__(self, endpoint: str, method: str):
|
||
"""
|
||
初始化计时器
|
||
|
||
Args:
|
||
endpoint: API 端点
|
||
method: HTTP 方法
|
||
"""
|
||
self.endpoint = endpoint
|
||
self.method = method
|
||
self.start_time = None
|
||
self.status_code = None
|
||
self.error = None
|
||
|
||
def __enter__(self):
|
||
"""进入上下文"""
|
||
self.start_time = time.time()
|
||
return self
|
||
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
"""退出上下文"""
|
||
if self.start_time is None:
|
||
return
|
||
|
||
response_time = (time.time() - self.start_time) * 1000 # 转换为毫秒
|
||
|
||
if exc_type is not None:
|
||
# 发生了异常
|
||
self.status_code = 500
|
||
self.error = str(exc_val)
|
||
elif self.status_code is None:
|
||
# 没有设置状态码,默认为 200
|
||
self.status_code = 200
|
||
|
||
# 记录调用
|
||
api_monitor.record_call(
|
||
endpoint=self.endpoint,
|
||
method=self.method,
|
||
status_code=self.status_code,
|
||
response_time=response_time,
|
||
error=self.error
|
||
)
|
||
|
||
def set_status_code(self, status_code: int) -> None:
|
||
"""
|
||
设置状态码
|
||
|
||
Args:
|
||
status_code: HTTP 状态码
|
||
"""
|
||
self.status_code = status_code
|