Nora / app /main.py
GitHub Action
Deploy clean version of Nora
59bd45e
"""Main FastAPI application for Voice Text Processor.
This module initializes the FastAPI application, sets up configuration,
logging, and defines the application lifecycle.
Requirements: 10.1, 10.2, 10.3, 10.4, 10.5
"""
import logging
import uuid
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Optional
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from app.config import init_config, get_config
from app.logging_config import setup_logging, set_request_id, clear_request_id
from app.models import ProcessResponse, RecordData, ParsedData
from app.storage import StorageService, StorageError
from app.asr_service import ASRService, ASRServiceError
from app.semantic_parser import SemanticParserService, SemanticParserError
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager.
This handles startup and shutdown events for the application.
On startup, it initializes configuration and logging.
Requirements: 10.4 - Startup configuration validation
"""
# Startup
logger.info("Starting Voice Text Processor application...")
try:
# Initialize configuration (will raise ValueError if invalid)
config = init_config()
logger.info("Configuration loaded and validated successfully")
# Setup logging with config values
setup_logging(
log_level=config.log_level,
log_file=config.log_file
)
logger.info("Logging system configured")
# Log configuration (without sensitive data)
logger.info(f"Data directory: {config.data_dir}")
logger.info(f"Max audio size: {config.max_audio_size} bytes")
logger.info(f"Log level: {config.log_level}")
except ValueError as e:
# Configuration validation failed - refuse to start
logger.error(f"Configuration validation failed: {e}")
logger.error("Application startup aborted due to configuration errors")
raise RuntimeError(f"Configuration error: {e}") from e
except Exception as e:
logger.error(f"Unexpected error during startup: {e}", exc_info=True)
raise RuntimeError(f"Startup error: {e}") from e
logger.info("Application startup complete")
yield
# Shutdown
logger.info("Shutting down Voice Text Processor application...")
logger.info("Application shutdown complete")
# Create FastAPI application
app = FastAPI(
title="Voice Text Processor",
description="治愈系记录助手后端核心模块 - 语音和文本处理服务",
version="1.0.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5173",
"http://localhost:3000",
"http://172.18.16.245:5173", # 允许从电脑 IP 访问
"*" # 开发环境允许所有来源(生产环境应该限制)
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount static files for generated images
from pathlib import Path
from fastapi import Request
generated_images_dir = Path("generated_images")
generated_images_dir.mkdir(exist_ok=True)
app.mount("/generated_images", StaticFiles(directory="generated_images"), name="generated_images")
def get_base_url(request: Request) -> str:
"""获取请求的基础 URL(支持局域网访问)"""
# 使用请求的 host 来构建 URL
scheme = request.url.scheme # http 或 https
host = request.headers.get("host", "localhost:8000")
return f"{scheme}://{host}"
@app.get("/api/status")
async def root():
"""API status endpoint."""
return {
"service": "Voice Text Processor",
"status": "running",
"version": "1.0.0"
}
@app.get("/health")
async def health_check():
"""Health check endpoint."""
try:
config = get_config()
return {
"status": "healthy",
"data_dir": str(config.data_dir),
"max_audio_size": config.max_audio_size
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=503,
content={
"status": "unhealthy",
"error": str(e)
}
)
# Validation error class
class ValidationError(Exception):
"""Exception raised when input validation fails.
Requirements: 1.3, 8.5, 9.1
"""
def __init__(self, message: str):
super().__init__(message)
self.message = message
# Supported audio formats
SUPPORTED_AUDIO_FORMATS = {".mp3", ".wav", ".m4a", ".webm"}
@app.post("/api/process", response_model=ProcessResponse)
async def process_input(
audio: Optional[UploadFile] = File(None),
text: Optional[str] = Form(None)
) -> ProcessResponse:
"""Process user input (audio or text) and extract structured data.
This endpoint accepts either an audio file or text content, performs
speech recognition (if audio), semantic parsing, and stores the results.
Args:
audio: Audio file (multipart/form-data) in mp3, wav, or m4a format
text: Text content (application/json) in UTF-8 encoding
Returns:
ProcessResponse containing record_id, timestamp, mood, inspirations, todos
Raises:
HTTPException: With appropriate status code and error message
Requirements: 1.1, 1.2, 1.3, 7.7, 8.1, 8.2, 8.3, 8.4, 8.5, 8.6, 9.1, 9.2, 9.3, 9.4, 9.5
"""
request_id = str(uuid.uuid4())
timestamp = datetime.utcnow().isoformat() + "Z"
# Set request_id in logging context
set_request_id(request_id)
logger.info(f"Processing request - audio: {audio is not None}, text: {text is not None}")
try:
# Input validation
if audio is None and text is None:
raise ValidationError("请提供音频文件或文本内容")
if audio is not None and text is not None:
raise ValidationError("请只提供音频文件或文本内容中的一种")
# Get configuration
config = get_config()
# Initialize services
storage_service = StorageService(str(config.data_dir))
asr_service = ASRService(config.zhipu_api_key)
parser_service = SemanticParserService(config.zhipu_api_key)
original_text = ""
input_type = "text"
try:
# Handle audio input
if audio is not None:
input_type = "audio"
# Validate audio format
filename = audio.filename or "audio"
file_ext = "." + filename.split(".")[-1].lower() if "." in filename else ""
if file_ext not in SUPPORTED_AUDIO_FORMATS:
raise ValidationError(
f"不支持的音频格式: {file_ext}. "
f"支持的格式: {', '.join(SUPPORTED_AUDIO_FORMATS)}"
)
# Read audio file
audio_content = await audio.read()
# Validate audio file size
if len(audio_content) > config.max_audio_size:
raise ValidationError(
f"音频文件过大: {len(audio_content)} bytes. "
f"最大允许: {config.max_audio_size} bytes"
)
logger.info(
f"Audio file received: {filename}, "
f"size: {len(audio_content)} bytes"
)
# Transcribe audio to text
try:
original_text = await asr_service.transcribe(audio_content, filename)
logger.info(
f"ASR transcription successful. "
f"Text length: {len(original_text)}"
)
except ASRServiceError as e:
logger.error(
f"ASR service error: {e.message}",
exc_info=True
)
raise
# Handle text input
else:
# Validate text encoding (UTF-8)
# Accept whitespace-only text as valid UTF-8, but reject None or empty string
if text is None or text == "":
raise ValidationError("文本内容不能为空")
original_text = text
logger.info(
f"Text input received. "
f"Length: {len(original_text)}"
)
# Perform semantic parsing
try:
parsed_data = await parser_service.parse(original_text)
logger.info(
f"Semantic parsing successful. "
f"Mood: {'present' if parsed_data.mood else 'none'}, "
f"Inspirations: {len(parsed_data.inspirations)}, "
f"Todos: {len(parsed_data.todos)}"
)
except SemanticParserError as e:
logger.error(
f"Semantic parser error: {e.message}",
exc_info=True
)
raise
# Generate record ID and timestamp
record_id = str(uuid.uuid4())
record_timestamp = datetime.utcnow().isoformat() + "Z"
# Create record data
record = RecordData(
record_id=record_id,
timestamp=record_timestamp,
input_type=input_type,
original_text=original_text,
parsed_data=parsed_data
)
# Save to storage
try:
storage_service.save_record(record)
logger.info(f"Record saved: {record_id}")
# Save mood if present
if parsed_data.mood:
storage_service.append_mood(
parsed_data.mood,
record_id,
record_timestamp
)
logger.info(f"Mood data saved")
# Save inspirations if present
if parsed_data.inspirations:
storage_service.append_inspirations(
parsed_data.inspirations,
record_id,
record_timestamp
)
logger.info(
f"{len(parsed_data.inspirations)} "
f"inspiration(s) saved"
)
# Save todos if present
if parsed_data.todos:
storage_service.append_todos(
parsed_data.todos,
record_id,
record_timestamp
)
logger.info(
f"{len(parsed_data.todos)} "
f"todo(s) saved"
)
except StorageError as e:
logger.error(
f"Storage error: {str(e)}",
exc_info=True
)
raise
# Build success response
response = ProcessResponse(
record_id=record_id,
timestamp=record_timestamp,
mood=parsed_data.mood,
inspirations=parsed_data.inspirations,
todos=parsed_data.todos
)
logger.info(f"Request processed successfully")
return response
finally:
# Clean up services
await asr_service.close()
await parser_service.close()
# Clear request_id from context
clear_request_id()
except ValidationError as e:
# Input validation error - HTTP 400
logger.warning(
f"Validation error: {e.message}",
exc_info=True
)
clear_request_id()
return JSONResponse(
status_code=400,
content={
"error": e.message,
"timestamp": timestamp
}
)
except ASRServiceError as e:
# ASR service error - HTTP 500
logger.error(
f"ASR service unavailable: {e.message}",
exc_info=True
)
clear_request_id()
return JSONResponse(
status_code=500,
content={
"error": "语音识别服务不可用",
"detail": e.message,
"timestamp": timestamp
}
)
except SemanticParserError as e:
# Semantic parser error - HTTP 500
logger.error(
f"Semantic parser unavailable: {e.message}",
exc_info=True
)
clear_request_id()
return JSONResponse(
status_code=500,
content={
"error": "语义解析服务不可用",
"detail": e.message,
"timestamp": timestamp
}
)
except StorageError as e:
# Storage error - HTTP 500
logger.error(
f"Storage error: {str(e)}",
exc_info=True
)
clear_request_id()
return JSONResponse(
status_code=500,
content={
"error": "数据存储失败",
"detail": str(e),
"timestamp": timestamp
}
)
except Exception as e:
# Unexpected error - HTTP 500
logger.error(
f"Unexpected error: {str(e)}",
exc_info=True
)
clear_request_id()
return JSONResponse(
status_code=500,
content={
"error": "服务器内部错误",
"detail": str(e),
"timestamp": timestamp
}
)
@app.get("/api/records")
async def get_records():
"""Get all records."""
try:
config = get_config()
storage_service = StorageService(str(config.data_dir))
records = storage_service._read_json_file(storage_service.records_file)
return {"records": records}
except Exception as e:
logger.error(f"Failed to get records: {e}")
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
@app.get("/api/moods")
async def get_moods():
"""Get all moods from both moods.json and records.json."""
try:
config = get_config()
storage_service = StorageService(str(config.data_dir))
# 1. 读取 moods.json
moods_from_file = storage_service._read_json_file(storage_service.moods_file)
logger.info(f"Loaded {len(moods_from_file)} moods from moods.json")
# 2. 从 records.json 中提取心情数据
records = storage_service._read_json_file(storage_service.records_file)
moods_from_records = []
for record in records:
# 检查 parsed_data 中是否有 mood
parsed_data = record.get("parsed_data", {})
mood_data = parsed_data.get("mood")
if mood_data and mood_data.get("type"):
# 构造心情对象
mood_obj = {
"record_id": record["record_id"],
"timestamp": record["timestamp"],
"type": mood_data.get("type"),
"intensity": mood_data.get("intensity", 5),
"keywords": mood_data.get("keywords", []),
"original_text": record.get("original_text", "") # 添加原文
}
moods_from_records.append(mood_obj)
logger.info(f"Extracted {len(moods_from_records)} moods from records.json")
# 3. 合并两个来源的心情数据(去重,优先使用 records 中的数据)
# 同时需要补充 moods.json 中缺失的 original_text
mood_dict = {}
# 先添加 moods.json 中的数据
for mood in moods_from_file:
mood_dict[mood["record_id"]] = mood
# 如果没有 original_text,设置为空字符串
if "original_text" not in mood:
mood["original_text"] = ""
# 再添加/覆盖 records.json 中的数据(包含 original_text)
for mood in moods_from_records:
mood_dict[mood["record_id"]] = mood
# 转换为列表并按时间排序(最新的在前)
all_moods = list(mood_dict.values())
all_moods.sort(key=lambda x: x["timestamp"], reverse=True)
logger.info(f"Total unique moods: {len(all_moods)}")
return {"moods": all_moods}
except Exception as e:
logger.error(f"Failed to get moods: {e}", exc_info=True)
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
@app.get("/api/inspirations")
async def get_inspirations():
"""Get all inspirations."""
try:
config = get_config()
storage_service = StorageService(str(config.data_dir))
inspirations = storage_service._read_json_file(storage_service.inspirations_file)
return {"inspirations": inspirations}
except Exception as e:
logger.error(f"Failed to get inspirations: {e}")
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
@app.get("/api/todos")
async def get_todos():
"""Get all todos."""
try:
config = get_config()
storage_service = StorageService(str(config.data_dir))
todos = storage_service._read_json_file(storage_service.todos_file)
return {"todos": todos}
except Exception as e:
logger.error(f"Failed to get todos: {e}")
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
@app.patch("/api/todos/{todo_id}")
async def update_todo(todo_id: str, status: str = Form(...)):
"""Update todo status."""
try:
config = get_config()
storage_service = StorageService(str(config.data_dir))
todos = storage_service._read_json_file(storage_service.todos_file)
# Find and update todo
updated = False
for todo in todos:
if todo.get("record_id") == todo_id or str(hash(todo.get("task", ""))) == todo_id:
todo["status"] = status
updated = True
break
if not updated:
return JSONResponse(
status_code=404,
content={"error": "Todo not found"}
)
storage_service._write_json_file(storage_service.todos_file, todos)
return {"success": True}
except Exception as e:
logger.error(f"Failed to update todo: {e}")
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
@app.post("/api/chat")
async def chat_with_ai(text: str = Form(...)):
"""Chat with AI assistant using RAG with records.json as knowledge base.
This endpoint provides conversational AI that has context about the user's
previous records, moods, inspirations, and todos.
"""
try:
config = get_config()
storage_service = StorageService(str(config.data_dir))
# Load user's records as RAG knowledge base
records = storage_service._read_json_file(storage_service.records_file)
# Build context from recent records (last 10)
recent_records = records[-10:] if len(records) > 10 else records
context_parts = []
for record in recent_records:
original_text = record.get('original_text', '')
timestamp = record.get('timestamp', '')
# Add parsed data context
parsed_data = record.get('parsed_data', {})
mood = parsed_data.get('mood')
inspirations = parsed_data.get('inspirations', [])
todos = parsed_data.get('todos', [])
context_entry = f"[{timestamp}] 用户说: {original_text}"
if mood:
context_entry += f"\n情绪: {mood.get('type')} (强度: {mood.get('intensity')})"
if inspirations:
ideas = [insp.get('core_idea') for insp in inspirations]
context_entry += f"\n灵感: {', '.join(ideas)}"
if todos:
tasks = [todo.get('task') for todo in todos]
context_entry += f"\n待办: {', '.join(tasks)}"
context_parts.append(context_entry)
# Build system prompt with context
context_text = "\n\n".join(context_parts) if context_parts else "暂无历史记录"
system_prompt = f"""你是一个温柔、善解人意的AI陪伴助手。你的名字叫小喵。
你会用温暖、治愈的语气和用户聊天,给予他们情感支持和陪伴。
回复要简短、自然、有温度。
你可以参考用户的历史记录来提供更贴心的回复:
{context_text}
请基于这些背景信息,用温暖、理解的语气回复用户。如果用户提到之前的事情,你可以自然地关联起来。"""
try:
import httpx
# 增加超时时间,添加重试逻辑
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.post(
"https://open.bigmodel.cn/api/paas/v4/chat/completions",
headers={
"Authorization": f"Bearer {config.zhipu_api_key}",
"Content-Type": "application/json"
},
json={
"model": "glm-4-flash",
"messages": [
{
"role": "system",
"content": system_prompt
},
{
"role": "user",
"content": text
}
],
"temperature": 0.8,
"top_p": 0.9
}
)
if response.status_code == 200:
result = response.json()
ai_response = result.get("choices", [{}])[0].get("message", {}).get("content", "")
logger.info(f"AI chat successful with RAG context")
return {"response": ai_response}
else:
logger.error(f"AI chat failed: {response.status_code} {response.text}")
return {"response": "抱歉,我现在有点累了,稍后再聊好吗?"}
except httpx.TimeoutException:
logger.error(f"AI API timeout")
return {"response": "抱歉,网络有点慢,请稍后再试~"}
except httpx.ConnectError:
logger.error(f"AI API connection error")
return {"response": "抱歉,无法连接到AI服务,请检查网络连接~"}
except Exception as e:
logger.error(f"AI API call error: {e}")
return {"response": "抱歉,我现在有点累了,稍后再聊好吗?"}
except Exception as e:
logger.error(f"Chat error: {e}")
return {"response": "抱歉,我现在有点累了,稍后再聊好吗?"}
@app.get("/api/user/config")
async def get_user_config(request: Request):
"""Get user configuration including character image."""
try:
from app.user_config import UserConfig
from pathlib import Path
import os
config = get_config()
user_config = UserConfig(str(config.data_dir))
user_data = user_config.load_config()
base_url = get_base_url(request)
# 如果没有保存的图片,尝试加载默认形象或最新的本地图片
if not user_data.get('character', {}).get('image_url'):
generated_images_dir = Path("generated_images")
default_image = generated_images_dir / "default_character.jpeg"
# 优先使用默认形象
if default_image.exists():
logger.info("Loading default character image")
user_config.save_character_image(
image_url=str(default_image),
prompt="默认治愈系小猫形象",
preferences={
"color": "薰衣草紫",
"personality": "温柔",
"appearance": "无配饰",
"role": "陪伴式朋友"
}
)
user_data = user_config.load_config()
logger.info("Default character image loaded successfully")
# 如果没有默认形象,尝试加载最新的本地图片
elif generated_images_dir.exists():
# 获取所有图片文件
image_files = list(generated_images_dir.glob("character_*.jpeg"))
if image_files:
# 按修改时间排序,获取最新的
latest_image = max(image_files, key=lambda p: p.stat().st_mtime)
# 构建 URL 路径(使用动态 base_url)
image_url = f"{base_url}/generated_images/{latest_image.name}"
# 从文件名提取偏好设置
# 格式: character_颜色_性格_时间戳.jpeg
parts = latest_image.stem.split('_')
if len(parts) >= 3:
color = parts[1]
personality = parts[2]
# 更新配置
user_config.save_character_image(
image_url=str(latest_image),
prompt=f"Character with {color} and {personality}",
preferences={
"color": color,
"personality": personality,
"appearance": "无配饰",
"role": "陪伴式朋友"
}
)
# 重新加载配置
user_data = user_config.load_config()
logger.info(f"Loaded latest local image: {latest_image.name}")
# 如果 image_url 是本地路径,转换为 URL
image_url = user_data.get('character', {}).get('image_url')
if image_url and not image_url.startswith('http'):
# 本地路径,转换为 URL(处理 Windows 和 Unix 路径)
image_path = Path(image_url)
if image_path.exists():
# 使用正斜杠构建 URL(使用动态 base_url)
user_data['character']['image_url'] = f"{base_url}/generated_images/{image_path.name}"
else:
# 如果路径不存在,尝试只使用文件名
filename = image_path.name
full_path = Path("generated_images") / filename
if full_path.exists():
user_data['character']['image_url'] = f"{base_url}/generated_images/{filename}"
logger.info(f"Converted path to URL: {filename}")
return user_data
except Exception as e:
logger.error(f"Failed to get user config: {e}")
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
@app.post("/api/character/generate")
async def generate_character(
request: Request,
color: str = Form(...),
personality: str = Form(...),
appearance: str = Form(...),
role: str = Form(...)
):
"""Generate AI character image based on preferences.
Args:
color: Color preference (温暖粉/天空蓝/薄荷绿等)
personality: Personality trait (活泼/温柔/聪明等)
appearance: Appearance feature (戴眼镜/戴帽子等)
role: Character role (陪伴式朋友/温柔照顾型长辈等)
Returns:
JSON with image_url, prompt, and preferences
"""
try:
from app.image_service import ImageGenerationService, ImageGenerationError
from app.user_config import UserConfig
from datetime import datetime
from pathlib import Path
import httpx
config = get_config()
# 检查是否配置了 MiniMax API
minimax_api_key = getattr(config, 'minimax_api_key', None)
if not minimax_api_key:
logger.warning("MiniMax API key not configured")
return JSONResponse(
status_code=400,
content={
"error": "MiniMax API 未配置",
"detail": "请在 .env 文件中配置 MINIMAX_API_KEY。访问 https://platform.minimaxi.com/ 获取 API 密钥。"
}
)
# 初始化服务
image_service = ImageGenerationService(
api_key=minimax_api_key,
group_id=getattr(config, 'minimax_group_id', None)
)
user_config = UserConfig(str(config.data_dir))
try:
logger.info(
f"Generating character image: "
f"color={color}, personality={personality}, "
f"appearance={appearance}, role={role}"
)
# 生成图像
result = await image_service.generate_image(
color=color,
personality=personality,
appearance=appearance,
role=role,
aspect_ratio="1:1",
n=1
)
# 下载图片到本地
generated_images_dir = Path("generated_images")
generated_images_dir.mkdir(exist_ok=True)
# 生成文件名:character_颜色_性格_时间戳.jpeg
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"character_{color}_{personality}_{timestamp}.jpeg"
local_path = generated_images_dir / filename
logger.info(f"Downloading image to: {local_path}")
# 下载图片
async with httpx.AsyncClient(timeout=60.0) as client:
response = await client.get(result['url'])
if response.status_code == 200:
with open(local_path, 'wb') as f:
f.write(response.content)
logger.info(f"Image saved to: {local_path}")
else:
logger.error(f"Failed to download image: HTTP {response.status_code}")
# 如果下载失败,仍然使用远程 URL
local_path = None
# 保存到用户配置
preferences = {
"color": color,
"personality": personality,
"appearance": appearance,
"role": role
}
# 使用本地路径(如果下载成功)
image_url = str(local_path) if local_path else result['url']
user_config.save_character_image(
image_url=image_url,
prompt=result['prompt'],
revised_prompt=result.get('metadata', {}).get('revised_prompt'),
preferences=preferences
)
logger.info(f"Character image generated and saved: {image_url}")
# 返回 HTTP URL(使用动态 base_url)
base_url = get_base_url(request)
if local_path:
http_url = f"{base_url}/generated_images/{local_path.name}"
else:
http_url = image_url
return {
"success": True,
"image_url": http_url,
"prompt": result['prompt'],
"preferences": preferences,
"task_id": result.get('task_id')
}
finally:
await image_service.close()
except ImageGenerationError as e:
logger.error(f"Image generation error: {e.message}")
# 提供更友好的错误信息
error_detail = e.message
if "invalid api key" in e.message.lower():
error_detail = "API 密钥无效,请检查 MINIMAX_API_KEY 配置是否正确"
elif "quota" in e.message.lower() or "配额" in e.message:
error_detail = "API 配额不足,请充值或等待配额恢复"
elif "timeout" in e.message.lower() or "超时" in e.message:
error_detail = "请求超时,请检查网络连接后重试"
return JSONResponse(
status_code=500,
content={
"error": "图像生成失败",
"detail": error_detail
}
)
except Exception as e:
logger.error(f"Failed to generate character: {e}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"error": "生成角色形象失败",
"detail": str(e)
}
)
@app.get("/api/character/history")
async def get_character_history(request: Request):
"""Get list of all generated character images.
Returns:
JSON with list of historical character images
"""
try:
from pathlib import Path
import os
base_url = get_base_url(request)
generated_images_dir = Path("generated_images")
if not generated_images_dir.exists():
return {"images": []}
# 获取所有图片文件
image_files = []
for file in generated_images_dir.glob("character_*.jpeg"):
# 解析文件名:character_颜色_性格_时间戳.jpeg
parts = file.stem.split("_")
if len(parts) >= 4:
color = parts[1]
personality = parts[2]
timestamp = "_".join(parts[3:])
# 获取文件信息
stat = file.stat()
image_files.append({
"filename": file.name,
"url": f"{base_url}/generated_images/{file.name}",
"color": color,
"personality": personality,
"timestamp": timestamp,
"created_at": stat.st_ctime,
"size": stat.st_size
})
# 按创建时间倒序排列(最新的在前)
image_files.sort(key=lambda x: x["created_at"], reverse=True)
logger.info(f"Found {len(image_files)} historical character images")
return {"images": image_files}
except Exception as e:
logger.error(f"Error getting character history: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/character/select")
async def select_character(
request: Request,
filename: str = Form(...)
):
"""Select a historical character image as current.
Args:
filename: Filename of the character image to select
Returns:
JSON with success status and image URL
"""
try:
from app.user_config import UserConfig
from pathlib import Path
config = get_config()
user_config = UserConfig(str(config.data_dir))
# 验证文件存在
image_path = Path("generated_images") / filename
if not image_path.exists():
raise HTTPException(status_code=404, detail="图片文件不存在")
# 解析文件名获取偏好设置
parts = filename.replace(".jpeg", "").split("_")
if len(parts) >= 4:
color = parts[1]
personality = parts[2]
preferences = {
"color": color,
"personality": personality,
"appearance": "未知",
"role": "未知"
}
else:
preferences = {}
# 更新用户配置
image_url = str(image_path)
user_config.save_character_image(
image_url=image_url,
prompt=f"历史形象: {filename}",
preferences=preferences
)
logger.info(f"Selected historical character: {filename}")
# 返回 HTTP URL(使用动态 base_url)
base_url = get_base_url(request)
http_url = f"{base_url}/generated_images/{filename}"
return {
"success": True,
"image_url": http_url,
"filename": filename,
"preferences": preferences
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error selecting character: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/character/preferences")
async def update_character_preferences(
color: Optional[str] = Form(None),
personality: Optional[str] = Form(None),
appearance: Optional[str] = Form(None),
role: Optional[str] = Form(None)
):
"""Update character preferences without generating new image.
Args:
color: Color preference (optional)
personality: Personality trait (optional)
appearance: Appearance feature (optional)
role: Character role (optional)
Returns:
JSON with updated preferences
"""
try:
from app.user_config import UserConfig
config = get_config()
user_config = UserConfig(str(config.data_dir))
# 更新偏好设置
user_config.update_character_preferences(
color=color,
personality=personality,
appearance=appearance,
role=role
)
# 返回更新后的配置
updated_config = user_config.load_config()
return {
"success": True,
"preferences": updated_config['character']['preferences']
}
except Exception as e:
logger.error(f"Failed to update preferences: {e}")
return JSONResponse(
status_code=500,
content={"error": str(e)}
)
if __name__ == "__main__":
import uvicorn
# Load config for server settings
try:
config = init_config()
setup_logging(log_level=config.log_level, log_file=config.log_file)
# Run server
uvicorn.run(
"app.main:app",
host=config.host,
port=config.port,
reload=False,
log_level=config.log_level.lower()
)
except Exception as e:
print(f"Failed to start application: {e}")
exit(1)