# --- HF model lists (single, light model for HF Spaces CPU) --- # We standardize on a small, fast model that runs reliably via HF Inference # and is suitable for free CPU Spaces constraints. THINKING_MODELS = ["Qwen/Qwen2.5-3B-Instruct"] INSTRUCT_MODELS = ["Qwen/Qwen2.5-3B-Instruct"] def _current_models(): return THINKING_MODELS if STATE.get("mode") == "thinking" else INSTRUCT_MODELS # app.py import os import json import hashlib import logging import threading from pathlib import Path from typing import List, Dict, Any, Tuple # --- Model mode state (thinking/instruct) with simple persistence --- from pathlib import Path APP_DIR = Path(__file__).parent DATA_DIR = APP_DIR / "data" DATA_DIR.mkdir(parents=True, exist_ok=True) STATE_PATH = DATA_DIR / "state.json" def _load_state(): if STATE_PATH.exists(): try: return json.loads(STATE_PATH.read_text(encoding="utf-8")) except Exception: pass return {"mode": "instruct"} def _save_state(s: dict): STATE_PATH.write_text(json.dumps(s, ensure_ascii=False, indent=2), encoding="utf-8") STATE = _load_state() from fastapi import FastAPI, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from huggingface_hub import HfApi, hf_hub_download from monitor import get_current_metrics, start_monitoring_thread from memory import get_history, save_history # ========================= # إعداد السجلّات # ========================= logging.basicConfig( level=logging.INFO, format="🪵 [%(asctime)s] [%(levelname)s] %(message)s" ) logger = logging.getLogger("app") # ========================= # ثوابت ومسارات # ========================= DATA_DIR = Path("data") CACHE_DIR = DATA_DIR / "cache" INDEX_DIR = DATA_DIR / "index" FILES_DIR = DATA_DIR / "files" # تخزين النص الكامل لكل ملف REPORT_FILE = DATA_DIR / "analysis_report.md" GRAPH_FILE = DATA_DIR / "code_graph.json" EMB_FILE = INDEX_DIR / "embeddings.faiss" META_FILE = INDEX_DIR / "chunks.pkl" HASH_MAP_FILE = INDEX_DIR / "hash_map.json" for p in [DATA_DIR, CACHE_DIR, INDEX_DIR, FILES_DIR]: p.mkdir(parents=True, exist_ok=True) # Env HF_TOKEN = os.getenv("HF_TOKEN", "") # Use a single, smaller model by default for faster responses and fewer 5xx MODEL_REPO = os.getenv("MODEL_REPO", "Qwen/Qwen2.5-3B-Instruct") # No fallbacks by default (can be provided via env if desired) FALLBACK_MODELS = [ m.strip() for m in os.getenv("FALLBACK_MODELS", "").split(",") if m.strip() ] # GGUF المحلي (إن توفر) LOCAL_GGUF_REPO = os.getenv("LOCAL_GGUF_REPO", "Qwen/Qwen2.5-3B-Instruct-GGUF") LOCAL_GGUF_FILE = os.getenv("LOCAL_GGUF_FILE", "qwen2.5-3b-instruct-q4_k_m.gguf") LOCAL_GGUF_PATH = CACHE_DIR / LOCAL_GGUF_FILE # تقسيم الشيفرة (قيمة قصوى للقراءة المؤقتة) MAX_FILE_BYTES = int(os.getenv("MAX_FILE_BYTES", str(10 * 1024 * 1024))) # 10MB احتياطيًا SYSTEM_PROMPT = """<|im_start|>system You are a senior AI code analyst. Analyze projects with hybrid indexing (code graph + retrieval). Return structured, accurate, concise answers. Use Arabic + English labels in the final report. <|im_end|>""" # ========================= # الحالة العالمية والقفل # ========================= all_chunks: List[Tuple[str, str]] = [] code_graph: Dict[str, Any] = {"files": {}} hash_map: Dict[str, str] = {} index_lock = threading.RLock() # ✅ لتأمين الفهرسة/الاسترجاع # ========================= # LLM (محلي عبر GGUF) # ========================= try: from llama_cpp import Llama except Exception: Llama = None llm = None logger.info(f"HF_TOKEN length: {len(HF_TOKEN)}") # تحقق من طول الtoken def load_local_model_if_configured(): """تحميل نموذج GGUF من HuggingFace Hub مباشرة.""" global llm if Llama is None: logger.warning("⚠️ llama_cpp غير متاح. لن يعمل النموذج المحلي.") return try: logger.info(f"⬇️ تحميل نموذج GGUF: {LOCAL_GGUF_REPO}/{LOCAL_GGUF_FILE}") llm = Llama.from_pretrained( repo_id=LOCAL_GGUF_REPO, filename=LOCAL_GGUF_FILE, # Llama params n_ctx=int(os.getenv("N_CTX", "32768")), n_threads=int(os.getenv("N_THREADS", "2")), n_batch=int(os.getenv("N_BATCH", "64")), n_gpu_layers=int(os.getenv("N_GPU_LAYERS", "0")), use_mlock=False, verbose=False, ) logger.info("✅ تم تحميل نموذج GGUF المحلي بنجاح.") except Exception as e: llm = None logger.error(f"❌ فشل تحميل/تشغيل GGUF: {e}") def call_local_llm(prompt: str, max_tokens: int = 800) -> str: if llm is None: raise RuntimeError("النموذج المحلي غير محمل") try: res = llm( prompt, max_tokens=max_tokens, temperature=0.4, top_p=0.9, stop=["<|im_end|>", "<|im_start|>"], echo=False ) return (res.get("choices", [{}])[0].get("text") or "").strip() except Exception as e: logger.error(f"❌ خطأ في استدعاء النموذج المحلي: {e}") raise RuntimeError(f"فشل استدعاء النموذج المحلي: {e}") def _call_hf_single_model(model_repo: str, prompt: str, max_new_tokens: int = 900) -> str: import requests if not HF_TOKEN: logger.error("❌ HF_TOKEN غير معرف.") raise RuntimeError("التوكن HF_TOKEN غير مضبوط ولا يوجد نموذج محلي.") url = f"https://api-inference.huggingface.co/models/{model_repo}" headers = {"Authorization": f"Bearer {HF_TOKEN}"} payload = { "inputs": prompt, "parameters": { "max_new_tokens": max_new_tokens, "temperature": 0.4, "top_p": 0.9, "return_full_text": False } } r = requests.post(url, headers=headers, json=payload, timeout=120) if r.status_code == 503: data = {} try: data = r.json() except Exception: pass eta = data.get("estimated_time") raise RuntimeError("النموذج قيد التحميل من HF (503)." + (f" متوقع {eta:.0f}ث" if isinstance(eta, (int, float)) else "")) try: r.raise_for_status() except requests.exceptions.HTTPError as e: status = e.response.status_code if status == 401: raise RuntimeError("التوكن مفقود أو غير صالح (401). تأكد من HF_TOKEN.") if status == 403: msg = "" try: msg = (e.response.json().get("error") or "").lower() except Exception: pass if "gated" in msg or "accept" in msg: raise RuntimeError("النموذج مسيَّج (403). يجب دخول صفحة النموذج والضغط على Accept.") raise RuntimeError("صلاحية الوصول مرفوضة (403).") if status == 404: raise RuntimeError("النموذج غير موجود أو غير متاح عبر السيرفرلس (404).") if status == 429: raise RuntimeError("تم تجاوز الحد المسموح للطلبات (429). جرّب لاحقًا.") try: err = e.response.json() except Exception: err = {"error": e.response.text} raise RuntimeError(f"خطأ HF ({status}): {err.get('error') or err}") data = r.json() if isinstance(data, list) and data and "generated_text" in data[0]: return data[0]["generated_text"] if isinstance(data, dict) and "generated_text" in data: return data["generated_text"] if isinstance(data, dict) and "error" in data: raise RuntimeError(f"HF error: {data['error']}") return json.dumps(data) def call_hf_inference(prompt: str, max_new_tokens: int = 900) -> str: raise RuntimeError("تم تعطيل HF Inference. النموذج المحلي مستخدم فقط.") def call_llm(prompt: str, max_tokens: int = 900) -> str: return call_local_llm(prompt, max_tokens) # ========================= # بناء الـ Prompt للدردشة (نسخة مبسطة) # ========================= def build_chat_prompt(history: List[List[str]], message: str, extra: str = "") -> str: prompt = f"<|im_start|>system\n{SYSTEM_PROMPT}\n<|im_end|>\n" for user_msg, ai_msg in history: prompt += f"<|im_start|>user\n{user_msg}\n<|im_end|>\n" prompt += f"<|im_start|>assistant\n{ai_msg}\n<|im_end|>\n" prompt += f"<|im_start|>user\n{message}\n{extra}\n<|im_end|>\n" prompt += f"<|im_start|>assistant\n" return prompt # ========================= # FastAPI # ========================= # NOTE: Warm-up moved to startup_event after helper functions are defined app = FastAPI(title="AI Code Analyst") # --- Root endpoint for Hugging Face health checks and simple UI --- from fastapi.responses import PlainTextResponse, HTMLResponse, JSONResponse @app.get("/", response_class=HTMLResponse) def root(logs: str | None = None): """ Minimal root endpoint so HF / healthcheck returns 200 OK. Use `/?logs=container` to tail last lines from data/app.log. """ if logs == "container": log_file = Path(DATA_DIR) / "app.log" if log_file.exists(): tail = "".join(log_file.read_text(encoding="utf-8", errors="ignore").splitlines(True)[-200:]) return PlainTextResponse(tail) return PlainTextResponse("No logs yet.", status_code=200) # Minimal HTML with quick chat form html = """ AI Code Analyst

✅ AI Code Analyst is running

Try /docs, /hf-check, or /metrics. | Logs: tail

Quick Chat (server-side)


        
""" return HTMLResponse(html) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Endpoint جديد لفحص التوكن وصلاحية الوصول @app.get("/hf-check") def hf_check(): api = HfApi() out = { "token_set": bool(HF_TOKEN), "token_valid": False, "model_repo": MODEL_REPO, "model_access": False, "model_private": None, "gated_hint": False, "message": "" } if not HF_TOKEN: out["message"] = "HF_TOKEN غير مضبوط." return out try: me = api.whoami(token=HF_TOKEN) out["token_valid"] = True out["message"] = f"Token OK for user: {me.get('name')}" except Exception as e: out["message"] = f"Token check failed: {type(e).__name__}: {e}" return out try: info = api.model_info(MODEL_REPO, token=HF_TOKEN) out["model_access"] = True out["model_private"] = getattr(info, "private", None) out["message"] += f" | Model reachable: {info.modelId}" except Exception as e: msg = str(e).lower() out["message"] += f" | Model access failed: {type(e).__name__}: {e}" out["gated_hint"] = ("gated" in msg or "accept" in msg) return out class UploadFilesRequest(BaseModel): files: Dict[str, str] # fname: content class DiffFilesRequest(BaseModel): deleted: List[str] modified: Dict[str, str] # fname: new_content class AnalyzeAndReportRequest(BaseModel): session_id: str query: str top_k: int | None = None class ChatRequest(BaseModel): session_id: str message: str class ChatResponse(BaseModel): response: str updated_history: List[List[str]] @app.on_event("startup") def startup_event(): # تحميل النموذج المحلي + مراقبة الموارد load_local_model_if_configured() start_monitoring_thread() # Warm-up محلي try: _ = call_local_llm("ping", max_tokens=1) logging.info("[LLM] Local warm-up OK") except Exception as e: logging.warning("[LLM] Local warm-up failed: %s", e) def rebuild_index_from_files(): # نسخة Lite: لا فهرسة return None @app.get("/metrics") def metrics(): return get_current_metrics() @app.post("/upload-files") def upload_files(req: UploadFilesRequest): """تحميل الملفات، تحليل سريع بالنموذج، ثم حذف الملفات دون حفظ دائم.""" FILES_DIR.mkdir(parents=True, exist_ok=True) saved_paths = [] for fname, content in req.files.items(): p = FILES_DIR / fname p.parent.mkdir(parents=True, exist_ok=True) p.write_text(content, encoding="utf-8") saved_paths.append(p) try: # بناء Prompt مختصر لتحليل الملفات parts = [] for p in saved_paths: try: txt = p.read_text(encoding="utf-8", errors="ignore") except Exception: txt = "" if len(txt) > 4000: txt = txt[:4000] + "\n... [truncated]" parts.append(f"[File] {p.name}\n{txt}") prompt = ( f"<|im_start|>system\n{SYSTEM_PROMPT}\n<|im_end|>\n" f"<|im_start|>user\nAnalyze these files and summarize key issues and functions.\n\n" + "\n\n".join(parts) + "\n<|im_end|>\n<|im_start|>assistant\n" ) summary = call_llm(prompt, max_tokens=900) return {"status": "ok", "summary": summary, "files": [p.name for p in saved_paths]} finally: # حذف الملفات فورًا for p in saved_paths: try: p.unlink(missing_ok=True) except Exception: pass # تم إلغاء مسار diff-files ضمن نسخة Lite (لا فهرسة أو مقارنة) # تم إلغاء analyze-and-report ضمن نسخة Lite def classify_intent(history: List[List[str]], message: str) -> Dict[str, Any]: """نسخة مبسطة: لا تصنيف، مجرد دردشة مباشرة.""" return {"intent": "CHAT", "confidence": 1.0, "action": "NONE", "targets": [], "reason": ""} @app.post("/chat", response_model=ChatResponse) def chat(req: ChatRequest): history = get_history(req.session_id) prompt = build_chat_prompt(history, req.message, "") try: response_text = call_llm(prompt, max_tokens=700) except Exception as e: raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}") updated = (history + [[req.message, response_text]])[-8:] save_history(req.session_id, updated) return ChatResponse(response=response_text, updated_history=updated) # --- Robust HF Inference with retries/fallback/warmup --- import requests, time HF_API_URL = "https://api-inference.huggingface.co/models/{model}" HF_TOKEN = os.getenv("HF_TOKEN","") def _hf_headers(): hdr = {"Accept":"application/json"} if HF_TOKEN: hdr["Authorization"] = f"Bearer {HF_TOKEN}" return hdr def _hf_call_single(model: str, prompt: str, max_new_tokens: int = 256, timeout_s: int = 60) -> str: url = HF_API_URL.format(model=model) payload = { "inputs": prompt, "parameters": {"max_new_tokens": max_new_tokens, "temperature": 0.3}, "options": {"wait_for_model": True, "use_cache": True} } tries, backoff = 0, 2 while True: tries += 1 try: r = requests.post(url, headers=_hf_headers(), json=payload, timeout=timeout_s) if r.status_code == 503: try: eta = r.json().get("estimated_time", 8) except Exception: eta = 8 time.sleep(min(30, max(2, int(eta)))) continue r.raise_for_status() data = r.json() if isinstance(data, list): if data and isinstance(data[0], dict) and "generated_text" in data[0]: return data[0]["generated_text"] if data and isinstance(data[0], dict) and "content" in data[0]: return data[0]["content"] if isinstance(data, dict) and "generated_text" in data: return data["generated_text"] return json.dumps(data, ensure_ascii=False) except requests.HTTPError as e: status = getattr(e.response, "status_code", None) if status in (502, 503, 504, 429) and tries < 3: time.sleep(backoff); backoff *= 2; continue try: text = e.response.text except Exception: text = "" raise RuntimeError(f"HF error {status} on {model}: {text}") from e except requests.RequestException as e: if tries < 3: time.sleep(backoff); backoff *= 2; continue raise RuntimeError(f"HF request failed on {model}: {e}") from e def call_hf_inference_robust(prompt: str, max_new_tokens: int = 256) -> str: last_err = None for m in _current_models(): try: return _hf_call_single(m, prompt, max_new_tokens) except Exception as e: logging.warning(f"[HF] model {m} failed: {e}") last_err = e continue raise RuntimeError(f"All HF models failed. Last: {last_err}") from fastapi import Body from pydantic import BaseModel class _SetModelIn(BaseModel): mode: str # 'thinking' or 'instruct' @app.post("/set-model") def set_model_endpoint(body: _SetModelIn): mode = (body.mode or "").lower().strip() if mode not in ("thinking","instruct"): raise HTTPException(400, "mode must be 'thinking' or 'instruct'") STATE["mode"] = mode _save_state(STATE) # Try warm-up immediately to inform user about readiness try: _ = _hf_call_single(_current_models()[0], "ping", 1, timeout_s=30) warmed = True except Exception: warmed = False return {"ok": True, "mode": mode, "models": _current_models(), "warmed": warmed}