GeminiBalance / app /service /stats /stats_service.py
CatPtain's picture
Upload 77 files
76b9762 verified
# app/service/stats_service.py
import datetime
from typing import Union
from sqlalchemy import and_, case, func, or_, select
from app.database.connection import database
from app.database.models import RequestLog
from app.log.logger import get_stats_logger
logger = get_stats_logger()
class StatsService:
"""Service class for handling statistics related operations."""
async def get_calls_in_last_seconds(self, seconds: int) -> dict[str, int]:
"""获取过去 N 秒内的调用次数 (总数、成功、失败)"""
try:
cutoff_time = datetime.datetime.now() - datetime.timedelta(seconds=seconds)
query = select(
func.count(RequestLog.id).label("total"),
func.sum(
case(
(
and_(
RequestLog.status_code >= 200,
RequestLog.status_code < 300,
),
1,
),
else_=0,
)
).label("success"),
func.sum(
case(
(
or_(
RequestLog.status_code < 200,
RequestLog.status_code >= 300,
),
1,
),
(RequestLog.status_code is None, 1),
else_=0,
)
).label("failure"),
).where(RequestLog.request_time >= cutoff_time)
result = await database.fetch_one(query)
if result:
return {
"total": result["total"] or 0,
"success": result["success"] or 0,
"failure": result["failure"] or 0,
}
return {"total": 0, "success": 0, "failure": 0}
except Exception as e:
logger.error(f"Failed to get calls in last {seconds} seconds: {e}")
return {"total": 0, "success": 0, "failure": 0}
async def get_calls_in_last_minutes(self, minutes: int) -> dict[str, int]:
"""获取过去 N 分钟内的调用次数 (总数、成功、失败)"""
return await self.get_calls_in_last_seconds(minutes * 60)
async def get_calls_in_last_hours(self, hours: int) -> dict[str, int]:
"""获取过去 N 小时内的调用次数 (总数、成功、失败)"""
return await self.get_calls_in_last_seconds(hours * 3600)
async def get_calls_in_current_month(self) -> dict[str, int]:
"""获取当前自然月内的调用次数 (总数、成功、失败)"""
try:
now = datetime.datetime.now()
start_of_month = now.replace(
day=1, hour=0, minute=0, second=0, microsecond=0
)
query = select(
func.count(RequestLog.id).label("total"),
func.sum(
case(
(
and_(
RequestLog.status_code >= 200,
RequestLog.status_code < 300,
),
1,
),
else_=0,
)
).label("success"),
func.sum(
case(
(
or_(
RequestLog.status_code < 200,
RequestLog.status_code >= 300,
),
1,
),
(RequestLog.status_code is None, 1),
else_=0,
)
).label("failure"),
).where(RequestLog.request_time >= start_of_month)
result = await database.fetch_one(query)
if result:
return {
"total": result["total"] or 0,
"success": result["success"] or 0,
"failure": result["failure"] or 0,
}
return {"total": 0, "success": 0, "failure": 0}
except Exception as e:
logger.error(f"Failed to get calls in current month: {e}")
return {"total": 0, "success": 0, "failure": 0}
async def get_api_usage_stats(self) -> dict:
"""获取所有需要的 API 使用统计数据 (总数、成功、失败)"""
try:
stats_1m = await self.get_calls_in_last_minutes(1)
stats_1h = await self.get_calls_in_last_hours(1)
stats_24h = await self.get_calls_in_last_hours(24)
stats_month = await self.get_calls_in_current_month()
return {
"calls_1m": stats_1m,
"calls_1h": stats_1h,
"calls_24h": stats_24h,
"calls_month": stats_month,
}
except Exception as e:
logger.error(f"Failed to get API usage stats: {e}")
default_stat = {"total": 0, "success": 0, "failure": 0}
return {
"calls_1m": default_stat.copy(),
"calls_1h": default_stat.copy(),
"calls_24h": default_stat.copy(),
"calls_month": default_stat.copy(),
}
async def get_api_call_details(self, period: str) -> list[dict]:
"""
获取指定时间段内的 API 调用详情
Args:
period: 时间段标识 ('1m', '1h', '24h')
Returns:
包含调用详情的字典列表,每个字典包含 timestamp, key, model, status
Raises:
ValueError: 如果 period 无效
"""
now = datetime.datetime.now()
if period == "1m":
start_time = now - datetime.timedelta(minutes=1)
elif period == "1h":
start_time = now - datetime.timedelta(hours=1)
elif period == "24h":
start_time = now - datetime.timedelta(hours=24)
else:
raise ValueError(f"无效的时间段标识: {period}")
try:
query = (
select(
RequestLog.request_time.label("timestamp"),
RequestLog.api_key.label("key"),
RequestLog.model_name.label("model"),
RequestLog.status_code,
)
.where(RequestLog.request_time >= start_time)
.order_by(RequestLog.request_time.desc())
)
results = await database.fetch_all(query)
details = []
for row in results:
status = "failure"
if row["status_code"] is not None:
status = "success" if 200 <= row["status_code"] < 300 else "failure"
details.append(
{
"timestamp": row[
"timestamp"
].isoformat(),
"key": row["key"],
"model": row["model"],
"status": status,
}
)
logger.info(
f"Retrieved {len(details)} API call details for period '{period}'"
)
return details
except Exception as e:
logger.error(
f"Failed to get API call details for period '{period}': {e}")
raise
async def get_key_usage_details_last_24h(self, key: str) -> Union[dict, None]:
"""
获取指定 API 密钥在过去 24 小时内按模型统计的调用次数。
Args:
key: 要查询的 API 密钥。
Returns:
一个字典,其中键是模型名称,值是调用次数。
如果查询出错或没有找到记录,可能返回 None 或空字典。
Example: {"gemini-pro": 10, "gemini-1.5-pro-latest": 5}
"""
logger.info(
f"Fetching usage details for key ending in ...{key[-4:]} for the last 24h."
)
cutoff_time = datetime.datetime.now() - datetime.timedelta(hours=24)
try:
query = (
select(
RequestLog.model_name, func.count(
RequestLog.id).label("call_count")
)
.where(
RequestLog.api_key == key,
RequestLog.request_time >= cutoff_time,
RequestLog.model_name.isnot(None),
)
.group_by(RequestLog.model_name)
.order_by(func.count(RequestLog.id).desc())
)
results = await database.fetch_all(query)
if not results:
logger.info(
f"No usage details found for key ending in ...{key[-4:]} in the last 24h."
)
return {}
usage_details = {row["model_name"]: row["call_count"]
for row in results}
logger.info(
f"Successfully fetched usage details for key ending in ...{key[-4:]}: {usage_details}"
)
return usage_details
except Exception as e:
logger.error(
f"Failed to get key usage details for key ending in ...{key[-4:]}: {e}",
exc_info=True,
)
raise