Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
# logging_utils.py | |
# 日志相关工具 | |
import os | |
import json | |
from datetime import datetime,timedelta | |
from collections import defaultdict | |
LOG_DIR = "/opt/nav-fronted/logs" | |
ACCESS_LOG = os.path.join(LOG_DIR, "access.log") | |
SUBMISSION_LOG = os.path.join(LOG_DIR, "submissions.log") | |
os.makedirs(LOG_DIR, exist_ok=True) | |
IP_REQUEST_RECORDS = defaultdict(list) | |
IP_LIMIT = 5 | |
def is_request_allowed(ip: str) -> bool: | |
now = datetime.now() | |
IP_REQUEST_RECORDS[ip] = [t for t in IP_REQUEST_RECORDS[ip] if now - t < timedelta(minutes=1)] | |
if len(IP_REQUEST_RECORDS[ip]) < IP_LIMIT: | |
IP_REQUEST_RECORDS[ip].append(now) | |
return True | |
return False | |
def log_access(user_ip: str = None, user_agent: str = None): | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
log_entry = { | |
"timestamp": timestamp, | |
"type": "access", | |
"user_ip": user_ip or "unknown", | |
"user_agent": user_agent or "unknown" | |
} | |
with open(ACCESS_LOG, "a") as f: | |
f.write(json.dumps(log_entry) + "\n") | |
def log_submission(scene: str, prompt: str, model: str, user: str = "anonymous", res: str = "unknown"): | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
log_entry = { | |
"timestamp": timestamp, | |
"type": "submission", | |
"user": user, | |
"scene": scene, | |
"prompt": prompt, | |
"model": model, | |
"res": res | |
} | |
with open(SUBMISSION_LOG, "a") as f: | |
f.write(json.dumps(log_entry) + "\n") | |
def read_logs(log_type: str = "all", max_entries: int = 50) -> list: | |
logs = [] | |
if log_type in ["all", "access"]: | |
try: | |
with open(ACCESS_LOG, "r") as f: | |
for line in f: | |
logs.append(json.loads(line.strip())) | |
except FileNotFoundError: | |
pass | |
if log_type in ["all", "submission"]: | |
try: | |
with open(SUBMISSION_LOG, "r") as f: | |
for line in f: | |
logs.append(json.loads(line.strip())) | |
except FileNotFoundError: | |
pass | |
logs.sort(key=lambda x: x["timestamp"], reverse=True) | |
return logs[:max_entries] | |
def format_logs_for_display(logs: list) -> str: | |
if not logs: | |
return "No log record" | |
markdown = "### System Access Log\n\n" | |
markdown += "| Time | Type | User/IP | Details |\n" | |
markdown += "|------|------|---------|----------|\n" | |
for log in logs: | |
timestamp = log.get("timestamp", "unknown") | |
log_type = "Access" if log.get("type") == "access" else "Submission" | |
if log_type == "Access": | |
user = log.get("user_ip", "unknown") | |
details = f"User-Agent: {log.get('user_agent', 'unknown')}" | |
else: | |
user = log.get("user", "anonymous") | |
result = log.get('res', 'unknown') | |
if result != "success": | |
if len(result) > 40: | |
result = f"{result[:20]}...{result[-20:]}" | |
details = f"Scene: {log.get('scene', 'unknown')}, Prompt: {log.get('prompt', '')}, Model: {log.get('model', 'unknown')}, result: {result}" | |
markdown += f"| {timestamp} | {log_type} | {user} | {details} |\n" | |
return markdown | |