from fastapi import FastAPI, HTTPException from pydantic import BaseModel from llama_cpp import Llama import logging import os import threading from fastapi.middleware.cors import CORSMiddleware from monitor import get_current_metrics, start_monitoring_thread from huggingface_hub import hf_hub_download from dotenv import load_dotenv # تحميل متغيرات البيئة load_dotenv() # إعداد السجل logging.basicConfig( level=logging.DEBUG, format="🪵 [%(asctime)s] [%(levelname)s] %(message)s", ) logger = logging.getLogger(__name__) MODEL_REPO = "QuantFactory/Qwen2.5-7B-Instruct-GGUF" MODEL_FILE = "Qwen2.5-7B-Instruct.Q4_K_M.gguf" MODEL_PATH = f"/home/user/app/data/cache/{MODEL_FILE}" HF_TOKEN = os.getenv("HF_TOKEN") # تحميل النموذج إذا لم يكن موجودًا if not os.path.exists(MODEL_PATH): os.makedirs("/home/user/app/data/cache", exist_ok=True) logger.info("📦 تحميل النموذج من Hugging Face Hub...") try: hf_hub_download( repo_id=MODEL_REPO, filename=MODEL_FILE, local_dir="/home/user/app/data/cache", token=HF_TOKEN, ) except Exception as e: logger.error(f"❌ فشل تحميل النموذج: {str(e)}") raise RuntimeError("فشل تحميل النموذج") from e # تأكيد وجود النموذج if os.path.exists(MODEL_PATH): logger.info(f"✅ النموذج موجود: {MODEL_PATH}") else: logger.error(f"❌ النموذج غير موجود: {MODEL_PATH}") raise RuntimeError("النموذج غير موجود بعد التحميل") # تحميل النموذج llm = Llama( model_path=MODEL_PATH, n_ctx=2048, n_threads=4, n_gpu_layers=0, n_batch=512, use_mlock=True, verbose=False ) # اختبار النموذج مباشرة try: logger.info("🔍 اختبار النموذج...") test_output = llm("مرحبا", max_tokens=10) logger.info(f"✅ اختبار النموذج ناجح: {test_output}") except Exception as e: logger.error(f"❌ فشل اختبار النموذج: {str(e)}") raise RuntimeError("فشل اختبار النموذج") from e SYSTEM_PROMPT = """<|im_start|>system You are Qwen, created by Alibaba Cloud. You are an AI development assistant. Follow these rules: 1. If request is simple (single file, <50 lines), handle it directly 2. For complex requests (multiple files, >50 lines), just respond with "CODER" 3. Always check code for errors before sending 4. Never execute unsafe code<|im_end|>""" # بدء مراقبة الموارد start_monitoring_thread() app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], # يمكن تخصيصه لاحقًا allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.on_event("startup") async def startup_event(): logger.info("🚀 الخادم بدأ بنجاح – /chat جاهز") class ChatRequest(BaseModel): message: str history: list[list[str]] = [] # يجب أن تكون قائمة من القوائم لتمثيل JSON class ChatResponse(BaseModel): response: str updated_history: list[list[str]] def format_prompt(messages): formatted = [] for role, content in messages: if role == "system": formatted.append(f"<|im_start|>system\n{content}<|im_end|>") elif role == "user": formatted.append(f"<|im_start|>user\n{content}<|im_end|>") else: formatted.append(f"<|im_start|>assistant\n{content}<|im_end|>") formatted.append("<|im_start|>assistant\n") return "\n".join(formatted) @app.get("/metrics") def read_metrics(): logger.debug("📊 تم طلب حالة الموارد") return get_current_metrics() @app.post("/chat", response_model=ChatResponse) def chat(req: ChatRequest): logger.info(f"📩 طلب جديد: {req.message}") try: messages = [("system", SYSTEM_PROMPT)] for user_msg, bot_msg in req.history: messages.append(("user", user_msg)) messages.append(("assistant", bot_msg)) messages.append(("user", req.message)) prompt = format_prompt(messages) logger.debug(f"📝 prompt المُرسل:\n{prompt[:300]}...") import gc gc.collect() output = llm( prompt, max_tokens=1024, temperature=0.7, top_p=0.95, stop=["<|im_end|>", "<|im_start|>"], echo=False ) reply = output["choices"][0]["text"].strip() logger.info(f"🤖 رد النموذج: {reply}") updated_history = req.history + [[req.message, reply]] return ChatResponse(response=reply, updated_history=updated_history) except Exception as e: logger.error(f"❌ خطأ أثناء المعالجة: {str(e)}") raise HTTPException(status_code=500, detail="حدث خطأ أثناء توليد الرد") @app.get("/") def root(): return {"message": "الخادم يعمل", "status": "ok"}