Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import json | |
| import hashlib | |
| import logging | |
| import threading | |
| from pathlib import Path | |
| from typing import List, Dict, Any, Tuple | |
| import numpy as np | |
| import faiss | |
| import pickle | |
| import ast as python_ast | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from sentence_transformers import SentenceTransformer | |
| from huggingface_hub import hf_hub_download | |
| from monitor import get_current_metrics, start_monitoring_thread | |
| from memory import get_history, save_history | |
| # ========================= | |
| # إعداد السجلّات | |
| # ========================= | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="🪵 [%(asctime)s] [%(levelname)s] %(message)s" | |
| ) | |
| logger = logging.getLogger("app") | |
| # ========================= | |
| # ثوابت ومسارات | |
| # ========================= | |
| DATA_DIR = Path("data") | |
| CACHE_DIR = DATA_DIR / "cache" | |
| INDEX_DIR = DATA_DIR / "index" | |
| FILES_DIR = DATA_DIR / "files" # تخزين النص الكامل لكل ملف | |
| REPORT_FILE = DATA_DIR / "analysis_report.md" | |
| GRAPH_FILE = DATA_DIR / "code_graph.json" | |
| EMB_FILE = INDEX_DIR / "embeddings.faiss" | |
| META_FILE = INDEX_DIR / "chunks.pkl" | |
| HASH_MAP_FILE = INDEX_DIR / "hash_map.json" | |
| for p in [DATA_DIR, CACHE_DIR, INDEX_DIR, FILES_DIR]: | |
| p.mkdir(parents=True, exist_ok=True) | |
| # Env | |
| HF_TOKEN = os.getenv("HF_TOKEN", "") | |
| MODEL_REPO = os.getenv("MODEL_REPO", "Qwen/Qwen3-8B-Instruct") | |
| # GGUF المحلي (إن توفر) | |
| LOCAL_GGUF_REPO = os.getenv("LOCAL_GGUF_REPO", "Triangle104/Qwen3-8B-Q4_K_M-GGUF") | |
| LOCAL_GGUF_FILE = os.getenv("LOCAL_GGUF_FILE", "qwen3-8b-q4_k_m.gguf") | |
| LOCAL_GGUF_PATH = CACHE_DIR / LOCAL_GGUF_FILE | |
| # تضمين | |
| EMBED_MODEL_NAME = os.getenv("EMBED_MODEL", "sentence-transformers/all-MiniLM-L6-v2") | |
| EMBED_DIM = int(os.getenv("EMBED_DIM", "384")) | |
| # تقسيم الشيفرة | |
| CHUNK_STEP = int(os.getenv("CHUNK_STEP", "40")) # ✅ قابل للتهيئة | |
| MAX_FILE_BYTES = int(os.getenv("MAX_FILE_BYTES", str(10 * 1024 * 1024))) # 10MB احتياطيًا | |
| SYSTEM_PROMPT = """<|im_start|>system | |
| You are a senior AI code analyst. Analyze projects with hybrid indexing (code graph + retrieval). | |
| Return structured, accurate, concise answers. Use Arabic + English labels in the final report. | |
| <|im_end|>""" | |
| # ========================= | |
| # الحالة العالمية والقفل | |
| # ========================= | |
| embed_model: SentenceTransformer | None = None | |
| faiss_index: faiss.Index | None = None | |
| all_chunks: List[Tuple[str, str]] = [] # (file_name, chunk_text) | |
| code_graph: Dict[str, Any] = {"files": {}} | |
| hash_map: Dict[str, str] = {} | |
| index_lock = threading.RLock() # ✅ لتأمين الفهرسة/الاسترجاع | |
| # ========================= | |
| # LLM (محلي/سحابي) | |
| # ========================= | |
| try: | |
| from llama_cpp import Llama | |
| except Exception: | |
| Llama = None | |
| llm = None # كائن النموذج المحلي إن توفر | |
| def load_local_model_if_configured(): | |
| """تحميل GGUF محليًا إن كان مفعّلًا.""" | |
| global llm | |
| if Llama is None: | |
| logger.info("ℹ️ llama_cpp غير متوفر. سيتم الاعتماد على HF Inference عند الحاجة.") | |
| return | |
| if not LOCAL_GGUF_PATH.exists(): | |
| try: | |
| logger.info(f"⬇️ تنزيل GGUF: {LOCAL_GGUF_REPO}/{LOCAL_GGUF_FILE}") | |
| hf_hub_download( | |
| repo_id=LOCAL_GGUF_REPO, | |
| filename=LOCAL_GGUF_FILE, | |
| local_dir=str(CACHE_DIR), | |
| token=HF_TOKEN or None | |
| ) | |
| except Exception as e: | |
| logger.warning(f"⚠️ تعذر تنزيل GGUF: {e}. سيتجاهل التحميل المحلي.") | |
| return | |
| try: | |
| llm = Llama( | |
| model_path=str(LOCAL_GGUF_PATH), | |
| n_ctx=int(os.getenv("N_CTX", "32768")), | |
| rope_scaling={"type": "yarn", "factor": 4.0}, | |
| n_threads=int(os.getenv("N_THREADS", "2")), | |
| n_gpu_layers=int(os.getenv("N_GPU_LAYERS", "0")), | |
| n_batch=int(os.getenv("N_BATCH", "64")), | |
| use_mlock=False, | |
| verbose=False | |
| ) | |
| logger.info("✅ تم تحميل النموذج المحلي (GGUF).") | |
| except Exception as e: | |
| llm = None | |
| logger.warning(f"⚠️ فشل تحميل النموذج المحلي: {e}") | |
| def call_local_llm(prompt: str, max_tokens: int = 800) -> str: | |
| if llm is None or Llama is None: | |
| return "" | |
| try: | |
| res = llm( | |
| prompt, | |
| max_tokens=max_tokens, | |
| temperature=0.4, | |
| top_p=0.9, | |
| stop=["<|im_end|>", "<|im_start|>"], | |
| echo=False | |
| ) | |
| return res["choices"][0]["text"].strip() | |
| except Exception as e: | |
| logger.warning(f"⚠️ local LLM call failed: {e}") | |
| return "" | |
| def call_hf_inference(prompt: str, max_new_tokens: int = 900) -> str: | |
| import requests | |
| if not HF_TOKEN: | |
| raise RuntimeError("HF_TOKEN is not set and local LLM unavailable") | |
| url = f"https://api-inference.huggingface.co/models/{MODEL_REPO}" | |
| headers = {"Authorization": f"Bearer {HF_TOKEN}"} | |
| payload = { | |
| "inputs": prompt, | |
| "parameters": { | |
| "max_new_tokens": max_new_tokens, | |
| "temperature": 0.4, | |
| "top_p": 0.9, | |
| "return_full_text": False | |
| } | |
| } | |
| r = requests.post(url, headers=headers, json=payload, timeout=120) | |
| r.raise_for_status() | |
| data = r.json() | |
| if isinstance(data, list) and data and "generated_text" in data[0]: | |
| return data[0]["generated_text"] | |
| if isinstance(data, dict) and "generated_text" in data: | |
| return data["generated_text"] | |
| if isinstance(data, dict) and "error" in data: | |
| raise RuntimeError(data["error"]) | |
| return json.dumps(data) | |
| def call_llm(prompt: str, max_tokens: int = 900) -> str: | |
| out = call_local_llm(prompt, max_tokens) | |
| if out: | |
| return out | |
| return call_hf_inference(prompt, max_tokens) | |
| # ========================= | |
| # أدوات التضمين والفهرسة | |
| # ========================= | |
| def sha256_text(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| def init_embed(): | |
| """تهيئة نموذج التضمين والقراءات من القرص.""" | |
| global embed_model, faiss_index, all_chunks, hash_map, code_graph | |
| embed_model = SentenceTransformer(EMBED_MODEL_NAME) | |
| if EMB_FILE.exists() and META_FILE.exists(): | |
| try: | |
| faiss_index = faiss.read_index(str(EMB_FILE)) | |
| with open(META_FILE, "rb") as f: | |
| all_chunks = pickle.load(f) | |
| logger.info(f"✅ تم تحميل الفهرس ({faiss_index.ntotal} متجه) من القرص.") | |
| except Exception as e: | |
| logger.warning(f"⚠️ تعذر تحميل الفهرس: {e}. إنشاء فهرس جديد.") | |
| faiss_index = faiss.IndexFlatL2(EMBED_DIM) | |
| all_chunks = [] | |
| else: | |
| faiss_index = faiss.IndexFlatL2(EMBED_DIM) | |
| all_chunks = [] | |
| if HASH_MAP_FILE.exists(): | |
| try: | |
| hash_map = json.loads(HASH_MAP_FILE.read_text(encoding="utf-8")) | |
| except Exception: | |
| hash_map = {} | |
| else: | |
| hash_map = {} | |
| if GRAPH_FILE.exists(): | |
| try: | |
| code_graph = json.loads(GRAPH_FILE.read_text(encoding="utf-8")) | |
| except Exception: | |
| code_graph = {"files": {}} | |
| else: | |
| code_graph = {"files": {}} | |
| def chunk_code_structured(code: str) -> List[str]: | |
| """قسّم الشيفرة إلى كتل حسب AST؛ وإن فشل فاقطع أسطريًا.""" | |
| try: | |
| tree = python_ast.parse(code) | |
| chunks: List[str] = [] | |
| lines = code.splitlines() | |
| for node in tree.body: | |
| s = max(getattr(node, "lineno", 1) - 1, 0) | |
| e = getattr(node, "end_lineno", s + 1) | |
| e = max(e, s + 1) | |
| chunks.append("\n".join(lines[s:e])) | |
| if chunks: | |
| return chunks | |
| except Exception as e: | |
| # ✅ سجل نوع الاستثناء والموقع إن توفر | |
| msg = f"{type(e).__name__}: {e}" | |
| logger.debug(f"AST parse failed; fallback to line-based. Reason: {msg}") | |
| step = CHUNK_STEP # ✅ من ENV | |
| lines = code.splitlines() | |
| return ["\n".join(lines[i:i + step]) for i in range(0, len(lines), step)] | |
| def parse_code_meta(file_name: str, code: str) -> Dict[str, Any]: | |
| """استخرج رموز/استيرادات/نداءات لملف.""" | |
| meta = {"hash": sha256_text(code), "symbols": [], "calls": [], "imports": []} | |
| try: | |
| tree = python_ast.parse(code) | |
| for node in python_ast.walk(tree): | |
| if isinstance(node, python_ast.FunctionDef): | |
| meta["symbols"].append({"name": node.name, "kind": "function", "line": node.lineno}) | |
| elif isinstance(node, python_ast.ClassDef): | |
| meta["symbols"].append({"name": node.name, "kind": "class", "line": node.lineno}) | |
| elif isinstance(node, python_ast.Assign): | |
| for t in getattr(node, "targets", []): | |
| if isinstance(t, python_ast.Name): | |
| meta["symbols"].append({"name": t.id, "kind": "variable", "line": node.lineno}) | |
| elif isinstance(node, python_ast.Import): | |
| for alias in node.names: | |
| meta["imports"].append(alias.name or "") | |
| elif isinstance(node, python_ast.ImportFrom): | |
| meta["imports"].append(node.module or "") | |
| elif isinstance(node, python_ast.Call): | |
| f = node.func | |
| if hasattr(f, "id"): | |
| meta["calls"].append(getattr(f, "id", "")) | |
| elif hasattr(f, "attr"): | |
| meta["calls"].append(getattr(f, "attr", "")) | |
| except Exception as e: | |
| logger.debug(f"parse_code_meta failed for {file_name}: {type(e).__name__}: {e}") | |
| return meta | |
| def reconstruct_all_vectors() -> np.ndarray: | |
| """إعادة بناء جميع المتجهات من الفهرس.""" | |
| if faiss_index is None or faiss_index.ntotal == 0: | |
| return np.array([], dtype=np.float32) | |
| xs = [faiss_index.reconstruct(i) for i in range(faiss_index.ntotal)] | |
| return np.array(xs, dtype=np.float32) if xs else np.array([], dtype=np.float32) | |
| def persist_index(): | |
| """حفظ الفهرس والميتا والخرائط.""" | |
| faiss.write_index(faiss_index, str(EMB_FILE)) | |
| with open(META_FILE, "wb") as f: | |
| pickle.dump(all_chunks, f) | |
| GRAPH_FILE.write_text(json.dumps(code_graph, ensure_ascii=False, indent=2), encoding="utf-8") | |
| HASH_MAP_FILE.write_text(json.dumps(hash_map, ensure_ascii=False, indent=2), encoding="utf-8") | |
| def upsert_file_to_index(file_name: str, content: str): | |
| """إدراج/تحديث ملف داخل الفهرس والميتا والحفظ على القرص.""" | |
| global faiss_index, all_chunks, code_graph, hash_map | |
| # ✅ حفظ نسخة أحدث على HF (القرص المحلي) | |
| file_path = FILES_DIR / file_name | |
| file_path.parent.mkdir(parents=True, exist_ok=True) | |
| file_path.write_text(content, encoding="utf-8") | |
| content_hash = sha256_text(content) | |
| prev_hash = hash_map.get(file_name) | |
| if prev_hash == content_hash: | |
| return | |
| chunks = chunk_code_structured(content) | |
| embeds = embed_model.encode(chunks, normalize_embeddings=True) | |
| with index_lock: | |
| # إعادة الدمج (سهل وآمن) | |
| old_vecs = reconstruct_all_vectors() | |
| new_vecs = np.array(embeds, dtype=np.float32) | |
| merged = new_vecs if old_vecs.size == 0 else np.vstack([old_vecs, new_vecs]) | |
| faiss_index = faiss.IndexFlatL2(EMBED_DIM) | |
| faiss_index.add(merged) | |
| all_chunks.extend([(file_name, c) for c in chunks]) | |
| code_graph["files"][file_name] = parse_code_meta(file_name, content) | |
| hash_map[file_name] = content_hash | |
| persist_index() | |
| def rebuild_index_from_files(): | |
| """إعادة بناء الفهرس بالكامل من محتويات data/files/.""" | |
| global faiss_index, all_chunks, code_graph, hash_map | |
| with index_lock: | |
| faiss_index = faiss.IndexFlatL2(EMBED_DIM) | |
| all_chunks = [] | |
| code_graph = {"files": {}} | |
| hash_map = {} | |
| for p in sorted(FILES_DIR.rglob("*")): | |
| if not p.is_file(): | |
| continue | |
| try: | |
| content = p.read_text(encoding="utf-8") | |
| except Exception: | |
| # تجاهل الملفات الثنائية/غير النصية | |
| continue | |
| fname = str(p.relative_to(FILES_DIR)).replace("\\", "/") | |
| chunks = chunk_code_structured(content) | |
| if not chunks: | |
| continue | |
| embeds = embed_model.encode(chunks, normalize_embeddings=True) | |
| vecs = np.array(embeds, dtype=np.float32) | |
| if vecs.size: | |
| faiss_index.add(vecs) | |
| all_chunks.extend([(fname, c) for c in chunks]) | |
| code_graph["files"][fname] = parse_code_meta(fname, content) | |
| hash_map[fname] = sha256_text(content) | |
| persist_index() | |
| def retrieve(query: str, k: int = 8) -> List[Tuple[str, str, float]]: | |
| """استرجاع أفضل k كتل للسياق.""" | |
| if faiss_index is None or faiss_index.ntotal == 0 or embed_model is None: | |
| return [] | |
| q = embed_model.encode([query], normalize_embeddings=True) | |
| D, I = faiss_index.search(np.array(q, dtype=np.float32), k) | |
| out: List[Tuple[str, str, float]] = [] | |
| for score, idx in zip(D[0], I[0]): | |
| if idx < 0 or idx >= len(all_chunks): | |
| continue | |
| file_name, chunk = all_chunks[idx] | |
| # تعزيز بسيط لو ظهر import/call من الاستعلام | |
| boost = 1.0 | |
| meta = code_graph["files"].get(file_name, {}) | |
| imports = set(meta.get("imports", [])) | |
| calls = set(meta.get("calls", [])) | |
| if any(tok in query for tok in (list(imports) + list(calls))): | |
| boost = 0.9 | |
| out.append((file_name, chunk, float(score) * boost)) | |
| out.sort(key=lambda x: x[2]) | |
| return out[:k] | |
| def render_graph_overview(limit: int = 100) -> str: | |
| lines = [] | |
| files = list(code_graph.get("files", {}).items())[:limit] | |
| for fname, meta in files: | |
| syms = ", ".join([f"{s.get('kind')}:{s.get('name')}" for s in meta.get("symbols", [])][:8]) | |
| imps = ", ".join(meta.get("imports", [])[:6]) | |
| cls = ", ".join(meta.get("calls", [])[:8]) | |
| lines.append(f"- File: {fname}\n Symbols: {syms}\n Imports: {imps}\n Calls: {cls}") | |
| return "\n".join(lines) | |
| def build_chat_prompt(history: List[List[str]], user_msg: str, extra: str = "") -> str: | |
| msgs = [("system", SYSTEM_PROMPT)] | |
| for u, a in history[-8:]: | |
| msgs.append(("user", u)) | |
| msgs.append(("assistant", a)) | |
| msgs.append(("user", (user_msg or "") + ("\n" + extra if extra else ""))) | |
| out = [] | |
| for role, content in msgs: | |
| if role == "system": | |
| out.append(f"<|im_start|>system\n{content}<|im_end|>") | |
| elif role == "user": | |
| out.append(f"<|im_start|>user\n{content}<|im_end|>") | |
| else: | |
| out.append(f"<|im_start|>assistant\n{content}<|im_end|>") | |
| out.append("<|im_start|>assistant\n") | |
| return "\n".join(out) | |
| def build_analysis_prompt(query: str, retrieved_docs: List[Tuple[str, str, float]]) -> str: | |
| graph_overview = render_graph_overview(120) | |
| ctx = [] | |
| for i, (fname, chunk, score) in enumerate(retrieved_docs, 1): | |
| ctx.append(f"### Source {i}\n[File] {fname}\n[Score] {score:.4f}\n```\n{chunk}\n```") | |
| context_block = "\n\n".join(ctx) | |
| instructions = ( | |
| "المطلوب: تحليل الملفات المسترجعة مع السياق التالي لإنتاج تقرير تحليلي شامل يشمل:\n" | |
| "1) التسلسل المنطقي (Logical Sequence) وخطوات التنفيذ\n" | |
| "2) التسلسل الوظيفي (Functional Flow) والمخطط التدفق النصي (Flow Outline)\n" | |
| "3) التبعيات بين الملفات (Dependencies) والاستدعاءات (Call Relations)\n" | |
| "4) العلاقات بين الملفات والمتغيرات العامة ومكان تعريفها (Global Vars Map)\n" | |
| "5) تحديد نقاط الضعف المحتملة (Logic/Security/Performance) إن وجدت\n" | |
| "6) توصيات اصلاح عملية\n" | |
| "صيغة المخرجات: Markdown منظم بعناوين عربية + English labels:\n" | |
| "## نظرة عامة / Overview\n" | |
| "## خريطة التبعيات / Dependency Map\n" | |
| "## المخطط التدفق / Flow Outline\n" | |
| "## تحليل منطقي ووظيفي / Logical & Functional Analysis\n" | |
| "## المتغيرات العامة / Global Variables\n" | |
| "## مشاكل محتملة / Potential Issues\n" | |
| "## توصيات / Recommendations" | |
| ) | |
| user = f"سؤال التحليل: {query}\n\n[Graph Overview]\n{graph_overview}\n\n[Retrieved Context]\n{context_block}" | |
| prompt = ( | |
| f"<|im_start|>system\n{SYSTEM_PROMPT}\n<|im_end|>\n" | |
| f"<|im_start|>user\n{instructions}\n\n{user}\n<|im_end|>\n" | |
| f"<|im_start|>assistant\n" | |
| ) | |
| return prompt | |
| # ========================= | |
| # نماذج الطلب/الاستجابة | |
| # ========================= | |
| class ChatRequest(BaseModel): | |
| session_id: str | |
| message: str | |
| class ChatResponse(BaseModel): | |
| response: str | |
| updated_history: list[list[str]] | |
| class AnalyzeRequest(BaseModel): | |
| files: dict[str, str] # name -> content | |
| class AnalyzeAndReportRequest(BaseModel): | |
| session_id: str | |
| query: str | |
| top_k: int | None = 10 | |
| class DiffRequest(BaseModel): | |
| modified: dict[str, str] = {} # filename -> full new content | |
| deleted: list[str] = [] | |
| # ========================= | |
| # (اختياري) هوكس Drive/GitHub | |
| # ========================= | |
| def maybe_upload_to_drive(local_path: Path): | |
| """هوك اختياري: ارفع نسخة إلى جوجل درايف إن كان الاتصال مهيّأً.""" | |
| # اتركها فارغة الآن حتى تضيف اعتماد Google Drive. | |
| # يمكن لاحقًا قراءة ENV مثل DRIVE_ENABLED=1 و OAuth كشفيرة. | |
| pass | |
| def maybe_commit_to_github(local_path: Path, message: str = "auto: update file"): | |
| """هوك اختياري: قم بكومِت/دَفع تلقائي إن كان الاتصال مهيّأً.""" | |
| # اتركها فارغة الآن لحين ربط GitHub عبر توكن/Repo. | |
| pass | |
| # ========================= | |
| # عمليات التحليل/التقرير | |
| # ========================= | |
| def analyze_and_report_internal(session_id: str, query: str, k: int = 10) -> str: | |
| retrieved_docs = retrieve(query, k=k) | |
| if not retrieved_docs: | |
| raise HTTPException(status_code=400, detail="لا توجد بيانات مفهرسة بعد. استخدم /analyze-files أولًا.") | |
| prompt = build_analysis_prompt(query, retrieved_docs) | |
| report = call_llm(prompt, max_tokens=1400) | |
| REPORT_FILE.write_text(report, encoding="utf-8") | |
| history = get_history(session_id) | |
| updated = (history + [[f"[ANALYZE] {query}", report]])[-8:] | |
| save_history(session_id, updated) | |
| return report | |
| # ========================= | |
| # تطبيق FastAPI | |
| # ========================= | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], allow_credentials=True, | |
| allow_methods=["*"], allow_headers=["*"] | |
| ) | |
| async def on_startup(): | |
| load_local_model_if_configured() | |
| start_monitoring_thread() | |
| init_embed() | |
| logger.info("🚀 التطبيق جاهز.") | |
| def root(): | |
| return {"message": "الخادم يعمل", "status": "ok"} | |
| def model_status(): | |
| status = "local_loaded" if llm else ("hf_ready" if HF_TOKEN else "no_model") | |
| return {"status": status, "repo": MODEL_REPO, "local": str(LOCAL_GGUF_PATH) if llm else None} | |
| def read_metrics(): | |
| return get_current_metrics() | |
| def read_monitor_log(): | |
| log_path = DATA_DIR / "monitor.log" | |
| if not log_path.exists(): | |
| log_path.touch() | |
| return {"log": log_path.read_text(encoding="utf-8")} | |
| def analyze_files(req: AnalyzeRequest): | |
| # ✅ حفظ واستبدال الأحدث محليًا + فهرسة | |
| total_bytes = 0 | |
| for fname, content in req.files.items(): | |
| total_bytes += len(content.encode("utf-8", errors="ignore")) | |
| if MAX_FILE_BYTES and len(content.encode("utf-8", errors="ignore")) > MAX_FILE_BYTES: | |
| raise HTTPException(status_code=413, detail=f"الملف {fname} يتجاوز الحجم المسموح.") | |
| for fname, content in req.files.items(): | |
| upsert_file_to_index(fname, content) | |
| # اختياري: رفع نسخة للأرشفة السحابية | |
| try: | |
| maybe_upload_to_drive(FILES_DIR / fname) | |
| except Exception as e: | |
| logger.warning(f"Drive upload skipped for {fname}: {e}") | |
| try: | |
| maybe_commit_to_github(FILES_DIR / fname, "auto: analyze & save") | |
| except Exception as e: | |
| logger.warning(f"GitHub commit skipped for {fname}: {e}") | |
| return {"status": "Files analyzed and cached", "files_indexed": list(req.files.keys())} | |
| def diff_files(req: DiffRequest): | |
| """تطبيق تعديلات Git (modified/deleted) مع إعادة بناء نظيفة للفهرس.""" | |
| # 1) حذف الملفات المطلوبة (من القرص ومن الخرائط) | |
| for fname in req.deleted: | |
| try: | |
| # حذف من القرص | |
| fp = FILES_DIR / fname | |
| if fp.exists(): | |
| fp.unlink() | |
| except Exception as e: | |
| logger.warning(f"⚠️ تعذر حذف {fname} من القرص: {e}") | |
| # تنظيف الخرائط | |
| hash_map.pop(fname, None) | |
| code_graph.get("files", {}).pop(fname, None) | |
| # 2) كتابة/تحديث الملفات المعدّلة على القرص | |
| for fname, content in req.modified.items(): | |
| if MAX_FILE_BYTES and len(content.encode("utf-8", errors="ignore")) > MAX_FILE_BYTES: | |
| raise HTTPException(status_code=413, detail=f"الملف {fname} يتجاوز الحجم المسموح.") | |
| fp = FILES_DIR / fname | |
| fp.parent.mkdir(parents=True, exist_ok=True) | |
| fp.write_text(content, encoding="utf-8") | |
| # 3) إعادة بناء الفهرس بالكامل من الملفات الحالية | |
| rebuild_index_from_files() | |
| # 4) (اختياري) رفع النسخ المعدلة للأرشفة | |
| for fname in req.modified.keys(): | |
| try: | |
| maybe_upload_to_drive(FILES_DIR / fname) | |
| except Exception as e: | |
| logger.warning(f"Drive upload skipped for {fname}: {e}") | |
| try: | |
| maybe_commit_to_github(FILES_DIR / fname, "auto: diff-files update") | |
| except Exception as e: | |
| logger.warning(f"GitHub commit skipped for {fname}: {e}") | |
| return { | |
| "status": "ok", | |
| "deleted": req.deleted, | |
| "modified": list(req.modified.keys()), | |
| "total_index_vectors": int(faiss_index.ntotal) if faiss_index else 0 | |
| } | |
| def analyze_and_report(req: AnalyzeAndReportRequest): | |
| report = analyze_and_report_internal(req.session_id, req.query, k=req.top_k or 10) | |
| return {"status": "ok", "report_path": str(REPORT_FILE), "preview": report[:1200]} | |
| def classify_intent(history: List[List[str]], message: str) -> Dict[str, Any]: | |
| inst = ( | |
| "أعد JSON فقط دون أي نص آخر.\n" | |
| "المفاتيح: intent (string), confidence (0-1), action (RETRIEVE_ONLY|ANALYZE_AND_REPORT|TRACE_SUBSET|NONE), " | |
| "targets (list of strings), reason (string).\n" | |
| "أمثلة:\n" | |
| "س: ما عمل الملف X؟ → {\"intent\":\"ASK_FILE_ROLE\",\"confidence\":0.9,\"action\":\"RETRIEVE_ONLY\",\"targets\":[\"X\"],\"reason\":\"...\"}\n" | |
| "س: لماذا لا تعمل ميزة الدخول؟ → {\"intent\":\"WHY_FEATURE_NOT_WORKING\",\"confidence\":0.85,\"action\":\"ANALYZE_AND_REPORT\",\"targets\":[],\"reason\":\"...\"}\n" | |
| "س: اين يُعرّف المتغير TOKEN وكيف يتغير؟ → {\"intent\":\"CODE_FLOW_TRACE\",\"confidence\":0.8,\"action\":\"TRACE_SUBSET\",\"targets\":[\"TOKEN\"],\"reason\":\"...\"}\n" | |
| ) | |
| p = ( | |
| f"<|im_start|>system\n{SYSTEM_PROMPT}\n<|im_end|>\n" | |
| f"<|im_start|>user\n{inst}\nالسؤال: {message}\nأعد JSON فقط.\n<|im_end|>\n" | |
| f"<|im_start|>assistant\n" | |
| ) | |
| txt = call_llm(p, max_tokens=200) | |
| try: | |
| start = txt.find("{") | |
| end = txt.rfind("}") | |
| obj = json.loads(txt[start:end+1]) if start != -1 and end != -1 else {} | |
| except Exception: | |
| obj = {} | |
| if not isinstance(obj, dict): | |
| obj = {} | |
| obj.setdefault("intent", "UNKNOWN") | |
| obj.setdefault("confidence", 0.0) | |
| obj.setdefault("action", "NONE") | |
| obj.setdefault("targets", []) | |
| obj.setdefault("reason", "") | |
| return obj | |
| def chat(req: ChatRequest): | |
| history = get_history(req.session_id) | |
| decision = classify_intent(history, req.message) | |
| action = decision.get("action", "NONE") | |
| response_text = "" | |
| if action == "ANALYZE_AND_REPORT": | |
| try: | |
| report = analyze_and_report_internal(req.session_id, req.message, k=10) | |
| response_text = "تم إنشاء تقرير تحليلي:\n\n" + report | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}") | |
| elif action == "RETRIEVE_ONLY": | |
| retrieved_docs = retrieve(req.message, k=6) | |
| ctx = [] | |
| for fname, chunk, score in retrieved_docs: | |
| ctx.append(f"From {fname} (score={score:.4f}):\n{chunk}") | |
| extra = "\n\n[Context]\n" + "\n\n".join(ctx) + "\n\n" + render_graph_overview(60) | |
| prompt = build_chat_prompt(history, req.message, extra) | |
| try: | |
| response_text = call_llm(prompt, max_tokens=700) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}") | |
| elif action == "TRACE_SUBSET": | |
| targets = decision.get("targets", []) | |
| key = " ".join(targets) if targets else req.message | |
| retrieved_docs = retrieve(key, k=10) | |
| ctx = [] | |
| for fname, chunk, score in retrieved_docs: | |
| ctx.append(f"From {fname} (score={score:.4f}):\n{chunk}") | |
| flow_query = req.message + "\nPlease trace variables/functions: " + ", ".join(targets) | |
| prompt = build_analysis_prompt(flow_query, retrieved_docs) | |
| try: | |
| trace_report = call_llm(prompt, max_tokens=1200) | |
| REPORT_FILE.write_text(trace_report, encoding="utf-8") | |
| response_text = "تقرير التتبع:\n\n" + trace_report | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}") | |
| else: | |
| prompt = build_chat_prompt(history, req.message, "") | |
| try: | |
| response_text = call_llm(prompt, max_tokens=600) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"LLM error: {str(e)}") | |
| updated = (history + [[req.message, response_text]])[-8:] | |
| save_history(req.session_id, updated) | |
| return ChatResponse(response=response_text, updated_history=updated) | |