Spaces:
Runtime error
Runtime error
| from fastapi import APIRouter, Depends, HTTPException, Request | |
| from schemas.chat import QueryRequest, AnswerResponse, ChatHistoryResponse | |
| from schemas.user import UserOut | |
| from dependencies import get_current_user | |
| from services.chat_service import ask_question_service, stream_chat_generator | |
| from utils.utils import delete_chat_from_redis | |
| from dependencies import get_app_state | |
| import logging | |
| import uuid | |
| from redis.asyncio import Redis | |
| from datetime import datetime, timezone | |
| from db.mongoDB import mongo_db | |
| from fastapi.responses import StreamingResponse | |
| from schemas.chat import Message,ConversationResponse | |
| from typing import List | |
| # Thiết lập logger | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter() | |
| async def create_chat( | |
| fastapi_request: Request, # Sử dụng Request từ FastAPI | |
| current_user: UserOut = Depends(get_current_user) # Sử dụng User model của bạn | |
| ): | |
| app_state = get_app_state(request=fastapi_request) | |
| redis_client: Redis = app_state.redis # Nên đặt tên rõ ràng là redis_client | |
| chat_id = str(uuid.uuid4()) | |
| current_utc_time = datetime.now(timezone.utc) # Sử dụng UTC | |
| # --- Lưu metadata vào Redis với key đã thống nhất --- | |
| meta_key = f"conversation_meta:{chat_id}" | |
| conversation_meta_data = { | |
| "user_id": current_user.email, # Sử dụng key 'user_id' cho nhất quán | |
| "created_at": current_utc_time.isoformat(), # Lưu dưới dạng ISO string | |
| "updated_at": current_utc_time.isoformat(), # Ban đầu giống created_at | |
| "message_count": 0 # Số lượng tin nhắn ban đầu | |
| } | |
| try: | |
| # Sử dụng await nếu redis_client là async | |
| if hasattr(redis_client, 'hmset_async'): # Kiểm tra phương thức async (ví dụ) | |
| await redis_client.hmset(meta_key, conversation_meta_data) | |
| await redis_client.expire(meta_key, 86400) # TTL: 24 giờ | |
| else: # Client đồng bộ | |
| await redis_client.hmset(meta_key, conversation_meta_data) | |
| await redis_client.expire(meta_key, 86400) # TTL: 24 giờ | |
| logger.info(f"Đã tạo metadata cho chat_id {chat_id} trong Redis với key {meta_key}.") | |
| except Exception as e: | |
| logger.error(f"Lỗi khi lưu metadata vào Redis cho chat {chat_id}: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail="Lỗi khi tạo metadata cho cuộc hội thoại.") | |
| # --- Lưu hội thoại rỗng vào MongoDB --- | |
| # (Đảm bảo messages ban đầu là list rỗng cho key messages chính của bạn) | |
| messages_key_in_mongo = "messages" # Key lưu trữ danh sách tin nhắn trong MongoDB | |
| conversation_doc = { | |
| "user_id": current_user.email, | |
| "conversation_id": chat_id, | |
| messages_key_in_mongo: [], # Danh sách tin nhắn rỗng | |
| "created_at": current_utc_time, # Lưu kiểu datetime object | |
| "updated_at": current_utc_time # Lưu kiểu datetime object | |
| } | |
| try: | |
| await mongo_db.conversations.insert_one(conversation_doc) | |
| logger.info(f"Đã tạo hội thoại rỗng {chat_id} trong MongoDB cho user {current_user.email}.") | |
| except Exception as e: | |
| logger.error(f"Lỗi khi tạo hội thoại rỗng trong MongoDB cho chat {chat_id}: {e}", exc_info=True) | |
| # Cân nhắc xóa key meta trong Redis nếu MongoDB thất bại để tránh trạng thái không nhất quán | |
| try: | |
| if hasattr(redis_client, 'delete_async'): | |
| await redis_client.delete(meta_key) | |
| else: | |
| await redis_client.delete(meta_key) | |
| logger.info(f"Đã xóa meta key {meta_key} khỏi Redis do lỗi MongoDB.") | |
| except Exception as redis_del_err: | |
| logger.error(f"Lỗi khi xóa meta key {meta_key} khỏi Redis: {redis_del_err}") | |
| raise HTTPException(status_code=500, detail="Lỗi khi tạo cuộc hội thoại trong cơ sở dữ liệu.") | |
| return {"chat_id": chat_id} | |
| async def chat_message(request_body: QueryRequest,request: Request, user:UserOut=Depends(get_current_user)): | |
| app_state = get_app_state(request=request) | |
| result = await ask_question_service(app_state,request_body, user) | |
| if not result: | |
| raise HTTPException(status_code=500, detail="Error during QA Chain invocation") | |
| return result | |
| # Đổi thành GET | |
| async def stream_chat_endpoint( | |
| chat_id: str, # Lấy từ query param | |
| input: str, # Lấy từ query param (tên param này phải khớp với FE) | |
| request: Request, | |
| user: UserOut = Depends(get_current_user) # Sửa kiểu user | |
| ): | |
| app_state = get_app_state(request=request) | |
| user_email = getattr(user, 'email', str(user)) # Lấy email an toàn | |
| # Kiểm tra input cơ bản | |
| if not chat_id or not input: | |
| raise HTTPException(status_code=400, detail="chat_id and input are required.") | |
| # Sử dụng EventSourceResponse (từ sse-starlette, cài đặt: pip install sse-starlette) | |
| # Nó xử lý các chi tiết của SSE tốt hơn StreamingResponse thô. | |
| # return EventSourceResponse(stream_chat_generator(app_state, chat_id, input, user_email)) | |
| # Hoặc dùng StreamingResponse trực tiếp (đơn giản hơn nhưng ít tính năng SSE hơn) | |
| return StreamingResponse( | |
| stream_chat_generator(app_state, chat_id, input, user_email), | |
| media_type="text/event-stream" | |
| ) | |
| async def delete_chat(chat_id: str, request: Request, user: UserOut = Depends(get_current_user)): | |
| app_state = get_app_state(request=request) | |
| redis = app_state.redis | |
| meta_key = f"conversation_meta:{chat_id}" | |
| # Kiểm tra quyền trước khi xóa | |
| user_in_chat = await redis.hget(meta_key, "user_id") | |
| if user_in_chat is None: | |
| raise HTTPException(status_code=404, detail="Chat not found") | |
| if user_in_chat != user.email: | |
| raise HTTPException(status_code=403, detail="Unauthorized") | |
| # Xóa chat | |
| await delete_chat_from_redis(redis, chat_id) | |
| # Xóa hội thoại trong MongoDB | |
| result =await mongo_db.conversations.delete_one({"conversation_id": chat_id, "user_id": user.email}) | |
| if result.deleted_count == 0: | |
| raise HTTPException(status_code=404, detail="Chat not found in MongoDB") | |
| return {"message": "Chat deleted successfully"} | |
| async def get_conversations(user: UserOut = Depends(get_current_user)): | |
| try: | |
| logger.info(f"Attempting to get conversations for user: {user.email}") | |
| db_conversations_cursor = mongo_db.conversations.find({"user_id": user.email}) | |
| response_list = [] | |
| async for conv_doc in db_conversations_cursor: | |
| logger.debug(f"Processing conversation doc: {conv_doc}") | |
| all_messages = conv_doc.get("messages", []) | |
| logger.debug(f"Messages for this conversation: {all_messages}") | |
| response_list.append({ | |
| "conversation_id": conv_doc["conversation_id"], | |
| "created_at": conv_doc["created_at"], | |
| "updated_at": conv_doc["updated_at"], | |
| "messages": all_messages | |
| }) | |
| logger.info(f"Successfully processed {len(response_list)} conversations.") | |
| return response_list | |
| except Exception as e: | |
| logger.error(f"Error in get_conversations for user {user.email}: {e}", exc_info=True) # exc_info=True sẽ log cả traceback | |
| raise HTTPException(status_code=500, detail="An error occurred while fetching conversations.") | |
| async def load_conversation_and_sync_redis( | |
| fastapi_request: Request, # Đổi tên biến request | |
| chat_id: str, # Lấy trực tiếp từ path param | |
| current_user: UserOut = Depends(get_current_user) # Sử dụng User model | |
| ): | |
| app_state = get_app_state(request=fastapi_request) | |
| redis_client = app_state.redis # Nên là client async nếu có thể | |
| # 1. Kiểm tra hội thoại trong MongoDB | |
| try: | |
| conversation_doc = await mongo_db.conversations.find_one( | |
| {"conversation_id": chat_id, "user_id": current_user.email} | |
| ) | |
| if not conversation_doc: | |
| logger.warning(f"Hội thoại {chat_id} không tồn tại hoặc không thuộc user {current_user.email}") | |
| raise HTTPException( | |
| status_code=404, | |
| detail="Hội thoại không tồn tại hoặc bạn không có quyền truy cập" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Lỗi MongoDB khi kiểm tra hội thoại {chat_id}: {e}", exc_info=True) | |
| raise HTTPException(status_code=500, detail="Lỗi kết nối cơ sở dữ liệu MongoDB") | |
| # 2. Chuẩn bị lịch sử tin nhắn từ MongoDB để trả về và nạp vào Redis | |
| raw_messages_from_db = conversation_doc.get("messages", []) | |
| validated_history_for_response: List[Message] = [] | |
| for msg_data in raw_messages_from_db: | |
| try: | |
| # Validate và chuyển đổi timestamp nếu cần (Pydantic sẽ tự làm nếu input là datetime obj) | |
| validated_history_for_response.append(Message(**msg_data)) | |
| except Exception as p_err: | |
| logger.warning(f"Bỏ qua message không hợp lệ trong chat {chat_id} từ DB: {msg_data}. Lỗi: {p_err}") | |
| # 3. Nạp/Đồng bộ tin nhắn và metadata vào Redis (Sử dụng key thống nhất) | |
| messages_redis_key = f"conversation_messages:{chat_id}" | |
| meta_redis_key = f"conversation_meta:{chat_id}" | |
| try: | |
| # Sử dụng pipeline cho hiệu quả | |
| # Giả sử redis_client là async | |
| with redis_client.pipeline() as pipe: | |
| pipe.delete(messages_redis_key) # Xóa messages cũ để nạp lại toàn bộ | |
| if validated_history_for_response: | |
| for msg_model in validated_history_for_response: | |
| # Đảm bảo lưu trữ theo cấu trúc Pydantic `Message` | |
| pipe.rpush(messages_redis_key, msg_model.model_dump_json()) # Pydantic V2 | |
| # hoặc .json() cho Pydantic V1 | |
| pipe.expire(messages_redis_key, 86400) # TTL: 24 giờ | |
| # Cập nhật/Tạo mới metadata | |
| # Lấy created_at, updated_at từ document MongoDB | |
| created_at_iso = conversation_doc["created_at"].isoformat() | |
| updated_at_iso = conversation_doc["updated_at"].isoformat() | |
| conversation_meta_data = { | |
| "user_id": current_user.email, | |
| "created_at": created_at_iso, | |
| "updated_at": updated_at_iso, | |
| "message_count": len(validated_history_for_response) | |
| } | |
| # Xóa meta cũ và đặt lại, hoặc dùng hmset để cập nhật | |
| pipe.delete(meta_redis_key) | |
| pipe.hset(meta_redis_key, conversation_meta_data) | |
| pipe.expire(meta_redis_key, 86400) | |
| pipe.execute() | |
| logger.info(f"Đã nạp và đồng bộ hội thoại {chat_id} vào Redis với {len(validated_history_for_response)} tin nhắn.") | |
| except Exception as e: | |
| logger.error(f"Lỗi khi nạp hội thoại {chat_id} vào Redis: {e}", exc_info=True) | |
| # 4. Trả về response | |
| return ChatHistoryResponse( | |
| chat_id=chat_id, | |
| history=validated_history_for_response, | |
| created_at=conversation_doc["created_at"], # Lấy từ doc MongoDB | |
| updated_at=conversation_doc["updated_at"], # Lấy từ doc MongoDB | |
| user_id=current_user.email | |
| ) |