# modules/knowledge_base/routes.py from flask import Blueprint, request, jsonify import os import time import threading import uuid from werkzeug.utils import secure_filename # Import existing components from modules.knowledge_base.processor import DocumentProcessor from modules.knowledge_base.vector_store import VectorStore from modules.knowledge_base.retriever import Retriever from modules.knowledge_base.reranker import Reranker knowledge_bp = Blueprint('knowledge', __name__) # Initialize components doc_processor = DocumentProcessor() vector_store = VectorStore() retriever = Retriever() reranker = Reranker() # Store progress information processing_tasks = {} # Upload folder configuration UPLOAD_FOLDER = "uploads" os.makedirs(UPLOAD_FOLDER, exist_ok=True) @knowledge_bp.route('/', methods=['GET']) def get_all_knowledge(): """Get all knowledge base information""" try: indices = retriever.get_all_indices() result = [] for index in indices: display_name = index[4:] if index.startswith('rag_') else index files = vector_store.get_files_in_index(index) result.append({ "id": index, "name": display_name, "files": files, "fileCount": len(files) }) return jsonify({"success": True, "data": result}) except Exception as e: import traceback traceback.print_exc() return jsonify({"success": False, "message": str(e)}), 500 @knowledge_bp.route('/', methods=['POST']) def create_knowledge(): """Create a new knowledge base""" try: data = request.form name = data.get('name') if not name: return jsonify({"success": False, "message": "Knowledge base name cannot be empty"}), 400 # 检查知识库是否已存在 indices = retriever.get_all_indices() if f"rag_{name}" in indices: return jsonify({"success": False, "message": f"Knowledge base '{name}' already exists"}), 400 # 处理上传文件 if 'file' not in request.files: return jsonify({"success": False, "message": "No file uploaded"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"success": False, "message": "No file selected"}), 400 # 保存原始文件名 original_filename = file.filename # 从原始文件名中提取扩展名,确保中文文件名也能正确识别文件类型 file_ext = os.path.splitext(original_filename)[1].lower() # 使用UUID生成唯一文件名 unique_filename = f"{uuid.uuid4().hex}{file_ext}" file_path = os.path.join(UPLOAD_FOLDER, unique_filename) file.save(file_path) # 创建任务ID task_id = f"task_{int(time.time())}_{name}" # 初始化任务状态 processing_tasks[task_id] = { "progress": 0, "status": "Starting document processing...", "index_name": name, "file_path": file_path, "original_filename": original_filename, # 保存原始文件名 "error": False, "docCount": 0 } # 处理文档的线程函数 def process_in_thread(): try: # 更新任务状态 processing_tasks[task_id]["progress"] = 10 processing_tasks[task_id]["status"] = "Loading document..." # 处理文档进度回调 def update_progress(progress, status): processing_tasks[task_id]["progress"] = min(95, progress) processing_tasks[task_id]["status"] = status # 处理文档,传递原始文件名 processed_docs = doc_processor.process( file_path, progress_callback=update_progress, original_filename=original_filename # 传递原始文件名 ) # 更新任务状态 processing_tasks[task_id]["progress"] = 95 processing_tasks[task_id]["status"] = "Creating vector store..." processing_tasks[task_id]["docCount"] = len(processed_docs) # 存储向量 vector_store.store(processed_docs, f"rag_{name}") # 完成任务 processing_tasks[task_id]["progress"] = 100 processing_tasks[task_id]["status"] = "Processing complete" except Exception as e: # 记录错误 processing_tasks[task_id]["error"] = True processing_tasks[task_id]["status"] = f"Processing failed: {str(e)}" import traceback traceback.print_exc() threading.Thread(target=process_in_thread).start() return jsonify({ "success": True, "message": "Started processing document", "task_id": task_id }), 202 except Exception as e: import traceback traceback.print_exc() return jsonify({"success": False, "message": str(e)}), 500 @knowledge_bp.route('/progress/', methods=['GET']) def get_progress(task_id): """Get document processing progress""" try: task_data = processing_tasks.get(task_id, { "progress": 0, "status": "Task not found", "error": True }) return jsonify({"success": True, "data": task_data}) except Exception as e: import traceback traceback.print_exc() return jsonify({"success": False, "message": str(e)}), 500 @knowledge_bp.route('//documents', methods=['POST']) def add_documents(index_id): """Add documents to a knowledge base""" try: # 检查知识库是否存在 indices = retriever.get_all_indices() if index_id not in indices: return jsonify({"success": False, "message": "Knowledge base does not exist"}), 404 # 处理上传文件 if 'file' not in request.files: return jsonify({"success": False, "message": "No file uploaded"}), 400 file = request.files['file'] if file.filename == '': return jsonify({"success": False, "message": "No file selected"}), 400 # 保存原始文件名并使用UUID生成唯一文件名 original_filename = file.filename file_ext = os.path.splitext(original_filename)[1].lower() unique_filename = f"{uuid.uuid4().hex}{file_ext}" file_path = os.path.join(UPLOAD_FOLDER, unique_filename) file.save(file_path) # 提取知识库名称 kb_name = index_id[4:] if index_id.startswith('rag_') else index_id # 创建任务ID task_id = f"task_{int(time.time())}_{kb_name}_{uuid.uuid4().hex[:8]}" # 初始化任务状态 processing_tasks[task_id] = { "progress": 0, "status": "Starting document processing...", "index_name": kb_name, "file_path": file_path, "original_filename": original_filename, # 保存原始文件名 "error": False, "docCount": 0 } # 处理文档的线程函数 def process_in_thread(): try: # 更新任务状态 processing_tasks[task_id]["progress"] = 10 processing_tasks[task_id]["status"] = "Loading document..." # 处理文档进度回调 def update_progress(progress, status): processing_tasks[task_id]["progress"] = min(95, progress) processing_tasks[task_id]["status"] = status # 处理文档,传递原始文件名 processed_docs = doc_processor.process( file_path, progress_callback=update_progress, original_filename=original_filename # 传递原始文件名 ) # 更新任务状态 processing_tasks[task_id]["progress"] = 95 processing_tasks[task_id]["status"] = "Creating vector store..." processing_tasks[task_id]["docCount"] = len(processed_docs) # 存储向量 vector_store.store(processed_docs, index_id) # 完成任务 processing_tasks[task_id]["progress"] = 100 processing_tasks[task_id]["status"] = "Processing complete" except Exception as e: # 记录错误 processing_tasks[task_id]["error"] = True processing_tasks[task_id]["status"] = f"Processing failed: {str(e)}" import traceback traceback.print_exc() threading.Thread(target=process_in_thread).start() return jsonify({ "success": True, "message": "Started processing document", "task_id": task_id }), 202 except Exception as e: import traceback traceback.print_exc() return jsonify({"success": False, "message": str(e)}), 500 @knowledge_bp.route('/', methods=['DELETE']) def delete_knowledge(index_id): """Delete a knowledge base""" try: result = vector_store.delete_index(index_id) if result: return jsonify({"success": True, "message": "Knowledge base deleted successfully"}) else: return jsonify({"success": False, "message": "Failed to delete knowledge base"}) except Exception as e: import traceback traceback.print_exc() return jsonify({"success": False, "message": str(e)}), 500 @knowledge_bp.route('//documents/', methods=['DELETE']) def delete_document(index_id, file_name): """Delete a document from a knowledge base""" try: result = vector_store.delete_document(index_id, file_name) if result: return jsonify({"success": True, "message": "Document deleted successfully"}) else: return jsonify({"success": False, "message": "Failed to delete document"}) except Exception as e: import traceback traceback.print_exc() return jsonify({"success": False, "message": str(e)}), 500